database: add methods to clear and update a namespace

Since transactions are now threaded we need updating
and clearing a namespace needs to be performed within
one transaction.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2022-01-29 17:18:32 -05:00
parent e895b10ac3
commit 43c1b998b0
1 changed files with 83 additions and 32 deletions

View File

@ -294,6 +294,70 @@ class MoonrakerDatabase:
f"Key '{key}' in namespace '{namespace}' not found", 404)
return val
def update_namespace(self,
namespace: str,
value: Dict[str, DBRecord]
) -> Awaitable[None]:
if self.eventloop.is_running():
return self.eventloop.run_in_thread(
self._update_ns_impl, namespace, value)
else:
self._update_ns_impl(namespace, value)
fut = self.eventloop.create_future()
fut.set_result(None)
return fut
def _update_ns_impl(self,
namespace: str,
value: Dict[str, DBRecord]
) -> None:
with self.thread_lock:
if not value:
return
if namespace not in self.namespaces:
raise self.server.error(
f"Invalid database namespace '{namespace}'")
db = self.namespaces[namespace]
with self.lmdb_env.begin(write=True, buffers=True, db=db) as txn:
# We only need to update the keys that changed
for key, val in value.items():
stored = txn.get(key.encode())
if stored is not None:
decoded = self._decode_value(stored)
if val == decoded:
continue
ret = txn.put(key.encode(), self._encode_value(val))
if not ret:
logging.info(f"Error inserting key '{key}' "
f"in namespace '{namespace}'")
def clear_namespace(self,
namespace: str,
drop_empty_db: bool = False
) -> Awaitable[None]:
if self.eventloop.is_running():
return self.eventloop.run_in_thread(
self._clear_ns_impl, namespace, drop_empty_db)
else:
self._clear_ns_impl(namespace, drop_empty_db)
fut = self.eventloop.create_future()
fut.set_result(None)
return fut
def _clear_ns_impl(self,
namespace: str,
drop_empty_db: bool = False
) -> None:
with self.thread_lock:
if namespace not in self.namespaces:
raise self.server.error(
f"Invalid database namespace '{namespace}'")
db = self.namespaces[namespace]
with self.lmdb_env.begin(write=True, db=db) as txn:
txn.drop(db, delete=drop_empty_db)
if drop_empty_db:
del self.namespaces[namespace]
async def ns_length_async(self, namespace: str) -> int:
return len(await self.ns_keys_async(namespace))
@ -385,6 +449,19 @@ class MoonrakerDatabase:
f"Namespace '{namespace}' not found", 404)
return NamespaceWrapper(namespace, self, parse_keys)
def wrap_async_namespace(self,
namespace: str,
parse_keys: bool = True
) -> AsyncNamespaceWrapper:
if self.eventloop.is_running():
raise self.server.error(
"Cannot wrap a namespace while the "
"eventloop is running")
if namespace not in self.namespaces:
raise self.server.error(
f"Namespace '{namespace}' not found", 404)
return AsyncNamespaceWrapper(namespace, self, parse_keys)
def _process_key(self, key: Union[List[str], str]) -> List[str]:
try:
key_list = key if isinstance(key, list) else key.split('.')
@ -565,15 +642,9 @@ class NamespaceWrapper:
key = [key]
self.db.update_item(self.namespace, key, value)
def update(self, value: Dict[str, Any]) -> None:
def update(self, value: Dict[str, DBRecord]) -> None:
self.db.can_call_sync("update")
val_keys = set(value.keys())
new_keys = val_keys - set(self.keys())
update_keys = val_keys - new_keys
for key in update_keys:
self.update_child([key], value[key])
for key in new_keys:
self.insert([key], value[key])
self.db.update_namespace(self.namespace, value)
def get(self,
key: Union[List[str], str],
@ -639,12 +710,7 @@ class NamespaceWrapper:
def clear(self) -> None:
self.db.can_call_sync("clear")
keys = self.keys()
for k in keys:
try:
self.delete([k])
except Exception:
pass
self.db.clear_namespace(self.namespace)
class AsyncNamespaceWrapper:
def __init__(self,
@ -678,16 +744,8 @@ class AsyncNamespaceWrapper:
key = [key]
return self.db.update_item(self.namespace, key, value)
def update(self, value: Dict[str, Any]) -> Awaitable[None]:
async def _do_update():
val_keys = set(value.keys())
new_keys = val_keys - set(await self.keys())
update_keys = val_keys - new_keys
for key in update_keys:
await self.update_child([key], value[key])
for key in new_keys:
await self.insert([key], value[key])
return self.eventloop.create_task(_do_update())
def update(self, value: Dict[str, DBRecord]) -> Awaitable[None]:
return self.db.update_namespace(self.namespace, value)
async def get(self,
key: Union[List[str], str],
@ -746,14 +804,7 @@ class AsyncNamespaceWrapper:
return self.eventloop.create_task(_do_pop())
def clear(self) -> Awaitable[None]:
async def _do_clear():
keys = await self.keys()
for k in keys:
try:
await self.delete([k])
except Exception:
pass
return self.eventloop.create_task(_do_clear())
return self.db.clear_namespace(self.namespace)
def load_component(config: ConfigHelper) -> MoonrakerDatabase: