From 0f7b781f57a01f178b7313a0a1cb37aecada8f33 Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Mon, 20 Feb 2023 12:46:11 -0500 Subject: [PATCH] file_manager: process metadata changes when no observer is configured Signed-off-by: Eric Callahan --- .../components/file_manager/file_manager.py | 256 +++++++++++++----- 1 file changed, 194 insertions(+), 62 deletions(-) diff --git a/moonraker/components/file_manager/file_manager.py b/moonraker/components/file_manager/file_manager.py index a7095f1..f0b57de 100644 --- a/moonraker/components/file_manager/file_manager.py +++ b/moonraker/components/file_manager/file_manager.py @@ -93,7 +93,6 @@ class FileManager: ) obs_class = BaseFileSystemObserver logging.info(f"Using File System Observer: {observer}") - self.no_observe = observer == "none" self.fs_observer = obs_class( config, self, self.gcode_metadata, self.sync_lock ) @@ -431,6 +430,7 @@ class FileManager: os.mkdir(dir_path) except Exception as e: raise self.server.error(str(e)) + self.fs_observer.on_item_create(root, dir_path, is_dir=True) elif method == 'DELETE' and root in self.full_access_roots: # Remove a directory action = "delete_dir" @@ -456,6 +456,7 @@ class FileManager: os.rmdir(dir_path) except Exception as e: raise self.server.error(str(e)) + self.fs_observer.on_item_delete(root, dir_path, is_dir=True) else: raise self.server.error("Operation Not Supported", 405) return self._sched_changed_event(action, root, dir_path) @@ -545,10 +546,18 @@ class FileManager: try: full_dest = await self.event_loop.run_in_thread( op_func, source_path, dest_path) - if dest_root == "gcodes": + if dest_root == "gcodes" and self.fs_observer.has_fast_observe: await self.sync_lock.wait_inotify_event(full_dest) except Exception as e: raise self.server.error(str(e)) from e + if action.startswith("move"): + ret = self.fs_observer.on_item_move( + source_root, dest_root, source_path, full_dest + ) + else: + ret = self.fs_observer.on_item_copy(dest_root, full_dest) + if ret is not None: + await ret return self._sched_changed_event( action, dest_root, full_dest, src_info[0], src_info[1] ) @@ -592,6 +601,7 @@ class FileManager: await self.event_loop.run_in_thread( self._zip_files, items, dest_path, store_only ) + self.fs_observer.on_item_create(dest_root, dest_path) ret = self._sched_changed_event("create_file", dest_root, str(dest_path)) return { "destination": ret["item"], @@ -831,6 +841,7 @@ class FileManager: await job_queue.queue_job( upload_info['filename'], check_exists=False) queued = True + self.fs_observer.on_item_create("gcodes", upload_info["dest_path"]) result = dict(self._sched_changed_event( "create_file", "gcodes", upload_info["dest_path"] )) @@ -843,6 +854,7 @@ class FileManager: await self._process_uploaded_file(upload_info) dest_path: str = upload_info["dest_path"] root: str = upload_info["root"] + self.fs_observer.on_item_create(root, dest_path) return self._sched_changed_event("create_file", root, dest_path) async def _process_uploaded_file(self, @@ -988,6 +1000,7 @@ class FileManager: raise self.sync_lock.setup("delete_file", full_path) os.remove(full_path) + self.fs_observer.on_item_delete(root, full_path) return self._sched_changed_event("delete_file", root, full_path) def _sched_changed_event( @@ -1009,7 +1022,7 @@ class FileManager: if source_path is not None and source_root is not None: src_rel_path = self.get_relative_path(source_root, source_path) notify_info['source_item'] = {'path': src_rel_path, 'root': source_root} - immediate |= self.no_observe + immediate |= not self.fs_observer.has_fast_observe delay = .005 if immediate else 1. key = f"{action}-{root}-{rel_path}" handle = self.event_loop.delay_callback( @@ -1200,6 +1213,10 @@ class BaseFileSystemObserver: self.gcode_metadata = gcode_metadata self.sync_lock = sync_lock + @property + def has_fast_observe(self) -> bool: + return False + def initialize(self) -> None: pass @@ -1209,6 +1226,152 @@ class BaseFileSystemObserver: fm = self.file_manager fm._sched_changed_event("root_update", root, root_path, immediate=True) + def try_move_metadata( + self, + prev_root: str, + new_root: str, + prev_path: str, + new_path: str, + is_dir: bool = False + ) -> Union[bool, Awaitable]: + if new_root == "gcodes": + if prev_root == "gcodes": + # moved within the gcodes root, move metadata + fm = self.file_manager + gcm = self.gcode_metadata + prev_rel_path = fm.get_relative_path("gcodes", prev_path) + new_rel_path = fm.get_relative_path("gcodes", new_path) + if is_dir: + gcm.move_directory_metadata(prev_rel_path, new_rel_path) + else: + return gcm.move_file_metadata(prev_rel_path, new_rel_path) + else: + # move from a non-gcodes root to gcodes root needs a rescan + self.clear_metadata(prev_root, prev_path, is_dir) + return False + elif prev_root == "gcodes": + # moved out of the gcodes root, remove metadata + self.clear_metadata(prev_root, prev_path, is_dir) + return True + + def clear_metadata( + self, root: str, path: str, is_dir: bool = False + ) -> None: + if root == "gcodes": + rel_path = self.file_manager.get_relative_path(root, str(path)) + if is_dir: + self.gcode_metadata.remove_directory_metadata(rel_path) + else: + self.gcode_metadata.remove_file_metadata(rel_path) + + def parse_gcode_metadata(self, file_path: str) -> asyncio.Event: + rel_path = self.file_manager.get_relative_path("gcodes", file_path) + ext = os.path.splitext(rel_path)[-1].lower() + try: + path_info = self.file_manager.get_path_info(file_path, "gcodes") + except Exception: + path_info = {} + if ( + ext not in VALID_GCODE_EXTS or + path_info.get('size', 0) == 0 + ): + evt = asyncio.Event() + evt.set() + return evt + if ext == ".ufp": + rel_path = os.path.splitext(rel_path)[0] + ".gcode" + path_info['ufp_path'] = file_path + return self.gcode_metadata.parse_metadata(rel_path, path_info) + + def _scan_directory_metadata( + self, start_path: pathlib.Path + ) -> Optional[Awaitable]: + # Use os.walk find files in sd path and subdirs + mevts: List[Coroutine] = [] + st = start_path.stat() + visited_dirs = {(st.st_dev, st.st_ino)} + for parent, dirs, files in os.walk(start_path, followlinks=True): + scan_dirs: List[str] = [] + # Filter out directories that have already been visted. This + # prevents infinite recrusion "followlinks" is set to True + parent_dir = pathlib.Path(parent) + for dname in dirs: + dir_path = parent_dir.joinpath(dname) + if not dir_path.exists(): + continue + st = dir_path.stat() + key = (st.st_dev, st.st_ino) + if key not in visited_dirs: + visited_dirs.add(key) + scan_dirs.append(dname) + dirs[:] = scan_dirs + for fname in files: + file_path = parent_dir.joinpath(fname) + if ( + not file_path.is_file() or + file_path.suffix not in VALID_GCODE_EXTS + ): + continue + mevt = self.parse_gcode_metadata(str(file_path)) + mevts.append(mevt.wait()) + if mevts: + return asyncio.gather(*mevts) + return None + + def on_item_copy(self, root: str, item_path: StrOrPath) -> Optional[Awaitable]: + if self.has_fast_observe: + return None + if isinstance(item_path, str): + item_path = pathlib.Path(item_path) + if root != "gcodes": + return None + if item_path.is_file() and item_path.suffix in VALID_GCODE_EXTS: + ret = self.parse_gcode_metadata(str(item_path)) + return ret.wait() + elif item_path.is_dir(): + return self._scan_directory_metadata(item_path) + return None + + def on_item_move( + self, + src_root: str, + dest_root: str, + src_path: StrOrPath, + dest_path: StrOrPath + ) -> Optional[Awaitable]: + if self.has_fast_observe: + return None + if isinstance(src_path, str): + src_path = pathlib.Path(src_path) + if isinstance(dest_path, str): + dest_path = pathlib.Path(dest_path) + is_dir = dest_path.is_dir() + ret = self.try_move_metadata( + src_root, dest_root, str(src_path), str(dest_path), is_dir + ) + if not isinstance(ret, bool): + return ret + elif ret is False: + # Need metadata scan + if is_dir: + return self._scan_directory_metadata(dest_path) + elif dest_path.is_file() and dest_path.suffix in VALID_GCODE_EXTS: + mevt = self.parse_gcode_metadata(str(dest_path)) + return mevt.wait() + return None + + def on_item_create( + self, root: str, item_path: StrOrPath, is_dir: bool = False + ) -> None: + pass + + def on_item_delete( + self, root: str, item_path: StrOrPath, is_dir: bool = False + ) -> None: + if self.has_fast_observe: + return + self.clear_metadata(root, str(item_path), is_dir) + def close(self) -> None: pass @@ -1563,6 +1726,34 @@ class InotifyObserver(BaseFileSystemObserver): self.pending_coroutines: List[Coroutine] = [] self._gc_notify_task: Optional[asyncio.Task] = None + @property + def has_fast_observe(self) -> bool: + return True + + # Override and pass the callbacks from the request handlers. Inotify + # detects events quickly and takes any required actions + def on_item_create( + self, root: str, item_path: StrOrPath, is_dir: bool = False + ) -> None: + pass + + def on_item_delete( + self, root: str, item_path: StrOrPath, is_dir: bool = False + ) -> None: + pass + + def on_item_move( + self, + src_root: str, + dest_root: str, + src_path: StrOrPath, + dest_path: StrOrPath + ) -> Optional[Awaitable]: + return None + + def on_item_copy(self, root: str, item_path: StrOrPath) -> Optional[Awaitable]: + return None + def add_root_watch(self, root: str, root_path: str) -> None: # remove all exisiting watches on root if root in self.watched_roots: @@ -1647,46 +1838,6 @@ class InotifyObserver(BaseFileSystemObserver): except Exception: logging.exception(f"Error removing watch: '{node.get_path()}'") - def clear_metadata(self, - root: str, - path: str, - is_dir: bool = False - ) -> None: - if root == "gcodes": - rel_path = self.file_manager.get_relative_path(root, path) - if is_dir: - self.gcode_metadata.remove_directory_metadata(rel_path) - else: - self.gcode_metadata.remove_file_metadata(rel_path) - - def try_move_metadata( - self, - prev_root: str, - new_root: str, - prev_path: str, - new_path: str, - is_dir: bool = False - ) -> Union[bool, Awaitable]: - if new_root == "gcodes": - if prev_root == "gcodes": - # moved within the gcodes root, move metadata - fm = self.file_manager - gcm = self.gcode_metadata - prev_rel_path = fm.get_relative_path("gcodes", prev_path) - new_rel_path = fm.get_relative_path("gcodes", new_path) - if is_dir: - gcm.move_directory_metadata(prev_rel_path, new_rel_path) - else: - return gcm.move_file_metadata(prev_rel_path, new_rel_path) - else: - # move from a non-gcodes root to gcodes root needs a rescan - self.clear_metadata(prev_root, prev_path, is_dir) - return False - elif prev_root == "gcodes": - # moved out of the gcodes root, remove metadata - self.clear_metadata(prev_root, prev_path, is_dir) - return True - def log_nodes(self) -> None: if self.server.is_verbose_enabled(): debug_msg = f"Inotify Watches After Scan:" @@ -1697,25 +1848,6 @@ class InotifyObserver(BaseFileSystemObserver): f"Watch: {wdesc}" logging.debug(debug_msg) - def parse_gcode_metadata(self, file_path: str) -> asyncio.Event: - rel_path = self.file_manager.get_relative_path("gcodes", file_path) - ext = os.path.splitext(rel_path)[-1].lower() - try: - path_info = self.file_manager.get_path_info(file_path, "gcodes") - except Exception: - path_info = {} - if ( - ext not in VALID_GCODE_EXTS or - path_info.get('size', 0) == 0 - ): - evt = asyncio.Event() - evt.set() - return evt - if ext == ".ufp": - rel_path = os.path.splitext(rel_path)[0] + ".gcode" - path_info['ufp_path'] = file_path - return self.gcode_metadata.parse_metadata(rel_path, path_info) - def _handle_move_timeout(self, cookie: int, is_dir: bool): if cookie not in self.pending_moves: return