database: introduce record based operations
Now that all transactions run in a thread it is possible for them to interleave. The record based operations allow for batch operations within a single transaction. Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
parent
e029b6c582
commit
e2a62f80d4
|
@ -174,6 +174,15 @@ class MoonrakerDatabase:
|
|||
fut.set_result(ret)
|
||||
return fut
|
||||
|
||||
# *** Nested Database operations***
|
||||
# The insert_item(), delete_item(), and get_item() methods may operate on
|
||||
# nested objects within a namespace. Each operation takes a key argument
|
||||
# that may either be a string or a list of strings. If the argument is
|
||||
# a string nested keys may be delitmted by a "." by which the string
|
||||
# will be split into a list of strings. The first key in the list must
|
||||
# identify the database record. Subsequent keys are optional and are
|
||||
# used to access elements in the deserialized objects.
|
||||
|
||||
def insert_item(self,
|
||||
namespace: str,
|
||||
key: Union[List[str], str],
|
||||
|
@ -312,6 +321,102 @@ class MoonrakerDatabase:
|
|||
f"Key '{key}' in namespace '{namespace}' not found", 404)
|
||||
return val
|
||||
|
||||
# *** Batch operations***
|
||||
# The insert_batch(), move_batch(), delete_batch(), and get_batch()
|
||||
# methods can be used to perform record level batch operations on
|
||||
# a namespace in a single transaction.
|
||||
|
||||
def insert_batch(self,
|
||||
namespace: str,
|
||||
records: Dict[str, Any]
|
||||
) -> Future[None]:
|
||||
return self._run_command(self._insert_batch_impl, namespace, records)
|
||||
|
||||
def _insert_batch_impl(self,
|
||||
namespace: str,
|
||||
records: Dict[str, Any]
|
||||
) -> None:
|
||||
if namespace not in self.namespaces:
|
||||
raise self.server.error(
|
||||
f"Invalid database namespace '{namespace}'")
|
||||
db = self.namespaces[namespace]
|
||||
with self.lmdb_env.begin(write=True, buffers=True, db=db) as txn:
|
||||
for key, val in records.items():
|
||||
ret = txn.put(key.encode(), self._encode_value(val))
|
||||
if not ret:
|
||||
logging.info(f"Error inserting record {key} into "
|
||||
f"namespace {namespace}")
|
||||
|
||||
def move_batch(self,
|
||||
namespace: str,
|
||||
source_keys: List[str],
|
||||
dest_keys: List[str]
|
||||
) -> Future[None]:
|
||||
return self._run_command(self._move_batch_impl, namespace,
|
||||
source_keys, dest_keys)
|
||||
|
||||
def _move_batch_impl(self,
|
||||
namespace: str,
|
||||
source_keys: List[str],
|
||||
dest_keys: List[str]
|
||||
) -> None:
|
||||
if namespace not in self.namespaces:
|
||||
raise self.server.error(
|
||||
f"Invalid database namespace '{namespace}'")
|
||||
db = self.namespaces[namespace]
|
||||
with self.lmdb_env.begin(write=True, db=db) as txn:
|
||||
for source, dest in zip(source_keys, dest_keys):
|
||||
val = txn.pop(source.encode())
|
||||
if val is not None:
|
||||
txn.put(dest.encode(), val)
|
||||
|
||||
def delete_batch(self,
|
||||
namespace: str,
|
||||
keys: List[str]
|
||||
) -> Future[Dict[str, Any]]:
|
||||
return self._run_command(self._del_batch_impl, namespace, keys)
|
||||
|
||||
def _del_batch_impl(self,
|
||||
namespace: str,
|
||||
keys: List[str]
|
||||
) -> Dict[str, Any]:
|
||||
if namespace not in self.namespaces:
|
||||
raise self.server.error(
|
||||
f"Invalid database namespace '{namespace}'")
|
||||
db = self.namespaces[namespace]
|
||||
result: Dict[str, Any] = {}
|
||||
with self.lmdb_env.begin(write=True, buffers=True, db=db) as txn:
|
||||
for key in keys:
|
||||
val = txn.pop(key.encode())
|
||||
if val is not None:
|
||||
result[key] = self._decode_value(val)
|
||||
return result
|
||||
|
||||
def get_batch(self,
|
||||
namespace: str,
|
||||
keys: List[str]
|
||||
) -> Future[Dict[str, Any]]:
|
||||
return self._run_command(self._get_batch_impl, namespace, keys)
|
||||
|
||||
def _get_batch_impl(self,
|
||||
namespace: str,
|
||||
keys: List[str]
|
||||
) -> Dict[str, Any]:
|
||||
if namespace not in self.namespaces:
|
||||
raise self.server.error(
|
||||
f"Invalid database namespace '{namespace}'")
|
||||
db = self.namespaces[namespace]
|
||||
result: Dict[str, Any] = {}
|
||||
encoded_keys: List[bytes] = [k.encode() for k in keys]
|
||||
with self.lmdb_env.begin(buffers=True, db=db) as txn:
|
||||
with txn.cursor() as cursor:
|
||||
vals = cursor.getmulti(encoded_keys)
|
||||
result = {bytes(k).decode(): self._decode_value(v)
|
||||
for k, v in vals}
|
||||
return result
|
||||
|
||||
# *** Namespace level operations***
|
||||
|
||||
def update_namespace(self,
|
||||
namespace: str,
|
||||
value: Mapping[str, DBRecord]
|
||||
|
@ -695,6 +800,21 @@ class NamespaceWrapper:
|
|||
key = [key]
|
||||
return self.db.delete_item(self.namespace, key)
|
||||
|
||||
def insert_batch(self, records: Dict[str, Any]) -> Future[None]:
|
||||
return self.db.insert_batch(self.namespace, records)
|
||||
|
||||
def move_batch(self,
|
||||
source_keys: List[str],
|
||||
dest_keys: List[str]
|
||||
) -> Future[None]:
|
||||
return self.db.move_batch(self.namespace, source_keys, dest_keys)
|
||||
|
||||
def delete_batch(self, keys: List[str]) -> Future[Dict[str, Any]]:
|
||||
return self.db.delete_batch(self.namespace, keys)
|
||||
|
||||
def get_batch(self, keys: List[str]) -> Future[Dict[str, Any]]:
|
||||
return self.db.get_batch(self.namespace, keys)
|
||||
|
||||
def length(self) -> Future[int]:
|
||||
return self.db.ns_length(self.namespace)
|
||||
|
||||
|
|
Loading…
Reference in New Issue