From 43c1b998b087db16f884d4497ba85edf77423865 Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Sat, 29 Jan 2022 17:18:32 -0500 Subject: [PATCH] database: add methods to clear and update a namespace Since transactions are now threaded we need updating and clearing a namespace needs to be performed within one transaction. Signed-off-by: Eric Callahan --- moonraker/components/database.py | 115 ++++++++++++++++++++++--------- 1 file changed, 83 insertions(+), 32 deletions(-) diff --git a/moonraker/components/database.py b/moonraker/components/database.py index b423ff6..8fae81c 100644 --- a/moonraker/components/database.py +++ b/moonraker/components/database.py @@ -294,6 +294,70 @@ class MoonrakerDatabase: f"Key '{key}' in namespace '{namespace}' not found", 404) return val + def update_namespace(self, + namespace: str, + value: Dict[str, DBRecord] + ) -> Awaitable[None]: + if self.eventloop.is_running(): + return self.eventloop.run_in_thread( + self._update_ns_impl, namespace, value) + else: + self._update_ns_impl(namespace, value) + fut = self.eventloop.create_future() + fut.set_result(None) + return fut + + def _update_ns_impl(self, + namespace: str, + value: Dict[str, DBRecord] + ) -> None: + with self.thread_lock: + if not value: + return + if namespace not in self.namespaces: + raise self.server.error( + f"Invalid database namespace '{namespace}'") + db = self.namespaces[namespace] + with self.lmdb_env.begin(write=True, buffers=True, db=db) as txn: + # We only need to update the keys that changed + for key, val in value.items(): + stored = txn.get(key.encode()) + if stored is not None: + decoded = self._decode_value(stored) + if val == decoded: + continue + ret = txn.put(key.encode(), self._encode_value(val)) + if not ret: + logging.info(f"Error inserting key '{key}' " + f"in namespace '{namespace}'") + + def clear_namespace(self, + namespace: str, + drop_empty_db: bool = False + ) -> Awaitable[None]: + if self.eventloop.is_running(): + return self.eventloop.run_in_thread( + self._clear_ns_impl, namespace, drop_empty_db) + else: + self._clear_ns_impl(namespace, drop_empty_db) + fut = self.eventloop.create_future() + fut.set_result(None) + return fut + + def _clear_ns_impl(self, + namespace: str, + drop_empty_db: bool = False + ) -> None: + with self.thread_lock: + if namespace not in self.namespaces: + raise self.server.error( + f"Invalid database namespace '{namespace}'") + db = self.namespaces[namespace] + with self.lmdb_env.begin(write=True, db=db) as txn: + txn.drop(db, delete=drop_empty_db) + if drop_empty_db: + del self.namespaces[namespace] + async def ns_length_async(self, namespace: str) -> int: return len(await self.ns_keys_async(namespace)) @@ -385,6 +449,19 @@ class MoonrakerDatabase: f"Namespace '{namespace}' not found", 404) return NamespaceWrapper(namespace, self, parse_keys) + def wrap_async_namespace(self, + namespace: str, + parse_keys: bool = True + ) -> AsyncNamespaceWrapper: + if self.eventloop.is_running(): + raise self.server.error( + "Cannot wrap a namespace while the " + "eventloop is running") + if namespace not in self.namespaces: + raise self.server.error( + f"Namespace '{namespace}' not found", 404) + return AsyncNamespaceWrapper(namespace, self, parse_keys) + def _process_key(self, key: Union[List[str], str]) -> List[str]: try: key_list = key if isinstance(key, list) else key.split('.') @@ -565,15 +642,9 @@ class NamespaceWrapper: key = [key] self.db.update_item(self.namespace, key, value) - def update(self, value: Dict[str, Any]) -> None: + def update(self, value: Dict[str, DBRecord]) -> None: self.db.can_call_sync("update") - val_keys = set(value.keys()) - new_keys = val_keys - set(self.keys()) - update_keys = val_keys - new_keys - for key in update_keys: - self.update_child([key], value[key]) - for key in new_keys: - self.insert([key], value[key]) + self.db.update_namespace(self.namespace, value) def get(self, key: Union[List[str], str], @@ -639,12 +710,7 @@ class NamespaceWrapper: def clear(self) -> None: self.db.can_call_sync("clear") - keys = self.keys() - for k in keys: - try: - self.delete([k]) - except Exception: - pass + self.db.clear_namespace(self.namespace) class AsyncNamespaceWrapper: def __init__(self, @@ -678,16 +744,8 @@ class AsyncNamespaceWrapper: key = [key] return self.db.update_item(self.namespace, key, value) - def update(self, value: Dict[str, Any]) -> Awaitable[None]: - async def _do_update(): - val_keys = set(value.keys()) - new_keys = val_keys - set(await self.keys()) - update_keys = val_keys - new_keys - for key in update_keys: - await self.update_child([key], value[key]) - for key in new_keys: - await self.insert([key], value[key]) - return self.eventloop.create_task(_do_update()) + def update(self, value: Dict[str, DBRecord]) -> Awaitable[None]: + return self.db.update_namespace(self.namespace, value) async def get(self, key: Union[List[str], str], @@ -746,14 +804,7 @@ class AsyncNamespaceWrapper: return self.eventloop.create_task(_do_pop()) def clear(self) -> Awaitable[None]: - async def _do_clear(): - keys = await self.keys() - for k in keys: - try: - await self.delete([k]) - except Exception: - pass - return self.eventloop.create_task(_do_clear()) + return self.db.clear_namespace(self.namespace) def load_component(config: ConfigHelper) -> MoonrakerDatabase: