From d39792b84e26bdefd00dac1cf30fd5d1ee5ba29e Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Thu, 16 Feb 2023 18:11:51 -0500 Subject: [PATCH] file_manager: allow concurrent inotify processing All of the awaitable calls in the inotify loop only block to postpone notifications. Only gcode notifications require async processing due to metadata analysis. Queue these notifications while allowing inotify events to be received concurrently. Signed-off-by: Eric Callahan fix --- .../components/file_manager/file_manager.py | 246 ++++++++++-------- 1 file changed, 142 insertions(+), 104 deletions(-) diff --git a/moonraker/components/file_manager/file_manager.py b/moonraker/components/file_manager/file_manager.py index 4333155..a7e072b 100644 --- a/moonraker/components/file_manager/file_manager.py +++ b/moonraker/components/file_manager/file_manager.py @@ -31,6 +31,7 @@ from typing import ( List, Set, Coroutine, + Awaitable, Callable, TypeVar, cast, @@ -1225,11 +1226,12 @@ class InotifyNode: metadata_events.append(mevt) return metadata_events - async def move_child_node(self, - child_name: str, - new_name: str, - new_parent: InotifyNode - ) -> None: + def move_child_node( + self, + child_name: str, + new_name: str, + new_parent: InotifyNode + ) -> None: self.flush_delete() child_node = self.pop_child_node(child_name) if child_node is None: @@ -1243,17 +1245,25 @@ class InotifyNode: new_root = child_node.get_root() logging.debug(f"Moving node from '{prev_path}' to '{new_path}'") # Attempt to move metadata - move_success = await self.ihdlr.try_move_metadata( - prev_root, new_root, prev_path, new_path, is_dir=True) - if not move_success: - # Need rescan - mevts = child_node.scan_node() - if mevts: - mfuts = [e.wait() for e in mevts] - await asyncio.gather(*mfuts) - self.ihdlr.notify_filelist_changed( - "move_dir", new_root, new_path, - prev_root, prev_path) + move_res = self.ihdlr.try_move_metadata( + prev_root, new_root, prev_path, new_path, is_dir=True + ) + if new_root == "gcodes": + async def _notify_move_dir(): + if move_res is False: + # Need rescan + mevts = child_node.scan_node() + if mevts: + mfuts = [e.wait() for e in mevts] + await asyncio.gather(*mfuts) + self.ihdlr.notify_filelist_changed( + "move_dir", new_root, new_path, prev_root, prev_path + ) + self.ihdlr.queue_gcode_notificaton(_notify_move_dir()) + else: + self.ihdlr.notify_filelist_changed( + "move_dir", new_root, new_path, prev_root, prev_path + ) def schedule_file_event(self, file_name: str, evt_name: str) -> None: if file_name in self.pending_file_events: @@ -1263,7 +1273,7 @@ class InotifyNode: pending_node.stop_event("create_node") self.pending_file_events[file_name] = evt_name - async def complete_file_write(self, file_name: str) -> None: + def complete_file_write(self, file_name: str) -> None: self.flush_delete() evt_name = self.pending_file_events.pop(file_name, None) if evt_name is None: @@ -1282,12 +1292,16 @@ class InotifyNode: file_path = os.path.join(self.get_path(), file_name) root = self.get_root() if root == "gcodes": - mevt = self.ihdlr.parse_gcode_metadata(file_path) - if os.path.splitext(file_path)[1].lower() == ".ufp": - # don't notify .ufp files - return - await mevt.wait() - self.ihdlr.notify_filelist_changed(evt_name, root, file_path) + async def _notify_file_write(): + mevt = self.ihdlr.parse_gcode_metadata(file_path) + if os.path.splitext(file_path)[1].lower() == ".ufp": + # don't notify .ufp files + return + await mevt.wait() + self.ihdlr.notify_filelist_changed(evt_name, root, file_path) + self.ihdlr.queue_gcode_notificaton(_notify_file_write()) + else: + self.ihdlr.notify_filelist_changed(evt_name, root, file_path) def add_child_node(self, node: InotifyNode) -> None: self.child_nodes[node.name] = node @@ -1438,16 +1452,14 @@ class INotifyHandler: self.inotify = INotify(nonblocking=True) self.event_loop.add_reader( self.inotify.fileno(), self._handle_inotify_read) - - self.node_loop_busy: bool = False - self.pending_inotify_events: List[InotifyEvent] = [] - self.watched_roots: Dict[str, InotifyRootNode] = {} self.watched_nodes: Dict[int, InotifyNode] = {} self.pending_moves: Dict[ int, Tuple[InotifyNode, str, asyncio.Handle]] = {} self.create_gcode_notifications: Dict[str, Any] = {} self.initialized: bool = False + self.pending_gcode_notificatons: List[Coroutine] = [] + self._gcode_queue_busy: bool = False def add_root_watch(self, root: str, root_path: str) -> None: # remove all exisiting watches on root @@ -1545,26 +1557,25 @@ class INotifyHandler: else: self.gcode_metadata.remove_file_metadata(rel_path) - async def try_move_metadata(self, - prev_root: str, - new_root: str, - prev_path: str, - new_path: str, - is_dir: bool = False - ) -> bool: + def try_move_metadata( + self, + prev_root: str, + new_root: str, + prev_path: str, + new_path: str, + is_dir: bool = False + ) -> Union[bool, Awaitable]: if new_root == "gcodes": if prev_root == "gcodes": # moved within the gcodes root, move metadata - prev_rel_path = self.file_manager.get_relative_path( - "gcodes", prev_path) - new_rel_path = self.file_manager.get_relative_path( - "gcodes", new_path) + fm = self.file_manager + gcm = self.gcode_metadata + prev_rel_path = fm.get_relative_path("gcodes", prev_path) + new_rel_path = fm.get_relative_path("gcodes", new_path) if is_dir: - await self.gcode_metadata.move_directory_metadata( - prev_rel_path, new_rel_path) + gcm.move_directory_metadata(prev_rel_path, new_rel_path) else: - return await self.gcode_metadata.move_file_metadata( - prev_rel_path, new_rel_path) + return gcm.move_file_metadata(prev_rel_path, new_rel_path) else: # move from a non-gcodes root to gcodes root needs a rescan self.clear_metadata(prev_root, prev_path, is_dir) @@ -1644,25 +1655,13 @@ class INotifyHandler: f"not currently tracked: name: {evt.name}, " f"flags: {flags}") continue - self.pending_inotify_events.append(evt) - if not self.node_loop_busy: - self.node_loop_busy = True - self.event_loop.register_callback(self._process_inotify_events) - - async def _process_inotify_events(self) -> None: - while self.pending_inotify_events: - evt = self.pending_inotify_events.pop(0) node = self.watched_nodes[evt.wd] if evt.mask & iFlags.ISDIR: - await self._process_dir_event(evt, node) + self._process_dir_event(evt, node) else: - await self._process_file_event(evt, node) - self.node_loop_busy = False + self._process_file_event(evt, node) - async def _process_dir_event(self, - evt: InotifyEvent, - node: InotifyNode - ) -> None: + def _process_dir_event(self, evt: InotifyEvent, node: InotifyNode) -> None: if evt.name in ['.', ".."]: # ignore events for self and parent return @@ -1691,7 +1690,7 @@ class INotifyHandler: # Moved from a currently watched directory prev_parent, child_name, hdl = moved_evt hdl.cancel() - await prev_parent.move_child_node(child_name, evt.name, node) + prev_parent.move_child_node(child_name, evt.name, node) else: # Moved from an unwatched directory, for our # purposes this is the same as creating a @@ -1699,10 +1698,7 @@ class INotifyHandler: self.sync_lock.add_pending_path("create_dir", full_path) node.create_child_node(evt.name) - async def _process_file_event(self, - evt: InotifyEvent, - node: InotifyNode - ) -> None: + def _process_file_event(self, evt: InotifyEvent, node: InotifyNode) -> None: ext: str = os.path.splitext(evt.name)[-1].lower() root = node.get_root() node_path = node.get_path() @@ -1714,7 +1710,7 @@ class INotifyHandler: node.schedule_file_event(evt.name, "create_file") if os.path.islink(file_path): logging.debug(f"Inotify symlink create: {file_path}") - await node.complete_file_write(evt.name) + node.complete_file_write(evt.name) elif evt.mask & iFlags.DELETE: logging.debug(f"Inotify file delete: {root}, " f"{node_path}, {evt.name}") @@ -1741,35 +1737,76 @@ class INotifyHandler: hdl.cancel() prev_root = prev_parent.get_root() prev_path = os.path.join(prev_parent.get_path(), prev_name) - move_success = await self.try_move_metadata( - prev_root, root, prev_path, file_path) - if not move_success: - # Unable to move, metadata needs parsing - mevt = self.parse_gcode_metadata(file_path) - await mevt.wait() - if can_notify: + move_res = self.try_move_metadata(prev_root, root, prev_path, file_path) + if root == "gcodes": + coro = self._finish_gcode_move( + root, prev_root, file_path, prev_path, can_notify, move_res + ) + self.queue_gcode_notificaton(coro) + else: self.notify_filelist_changed( - "move_file", root, file_path, - prev_root, prev_path) + "move_file", root, file_path, prev_root, prev_path + ) else: if can_notify: self.sync_lock.add_pending_path("create_file", file_path) if root == "gcodes": - mevt = self.parse_gcode_metadata(file_path) - await mevt.wait() - if can_notify: - self.notify_filelist_changed( - "create_file", root, file_path) + coro = self._finish_gcode_create_from_move(file_path, can_notify) + self.queue_gcode_notificaton(coro) + else: + self.notify_filelist_changed("create_file", root, file_path) if not can_notify: - logging.debug("Metadata is processing, suppressing move " - f"notification: {file_path}") + logging.debug( + "Metadata is processing, suppressing move notification: " + f"{file_path}" + ) elif evt.mask & iFlags.MODIFY: self.sync_lock.add_pending_path("modify_file", file_path) node.schedule_file_event(evt.name, "modify_file") elif evt.mask & iFlags.CLOSE_WRITE: logging.debug(f"Inotify writable file closed: {file_path}") # Only process files that have been created or modified - await node.complete_file_write(evt.name) + node.complete_file_write(evt.name) + + async def _finish_gcode_move( + self, + root: str, + prev_root: str, + file_path: str, + prev_path: str, + can_notify: bool, + move_result: Union[bool, Awaitable] + ) -> None: + if not isinstance(move_result, bool): + await move_result + elif not move_result: + # Unable to move, metadata needs parsing + mevt = self.parse_gcode_metadata(file_path) + await mevt.wait() + if can_notify: + self.notify_filelist_changed( + "move_file", root, file_path, prev_root, prev_path + ) + + async def _finish_gcode_create_from_move( + self, file_path: str, can_notify: bool + ) -> None: + mevt = self.parse_gcode_metadata(file_path) + await mevt.wait() + if can_notify: + self.notify_filelist_changed("create_file", "gcodes", file_path) + + def queue_gcode_notificaton(self, coro: Coroutine) -> None: + self.pending_gcode_notificatons.append(coro) + if not self._gcode_queue_busy: + self._gcode_queue_busy = True + self.event_loop.create_task(self._process_gcode_notifications()) + + async def _process_gcode_notifications(self) -> None: + while self.pending_gcode_notificatons: + coro = self.pending_gcode_notificatons.pop(0) + await coro + self._gcode_queue_busy = False def notify_filelist_changed(self, action: str, @@ -1980,10 +2017,7 @@ class MetadataStorage: except Exception: logging.debug(f"Error removing thumb at {thumb_path}") - async def move_directory_metadata(self, - prev_dir: str, - new_dir: str - ) -> None: + def move_directory_metadata(self, prev_dir: str, new_dir: str) -> None: if prev_dir[-1] != "/": prev_dir += "/" moved: List[Tuple[str, str, Dict[str, Any]]] = [] @@ -2000,14 +2034,12 @@ class MetadataStorage: source = [m[0] for m in moved] dest = [m[1] for m in moved] self.mddb.move_batch(source, dest) - eventloop = self.server.get_event_loop() - await eventloop.run_in_thread(self._move_thumbnails, moved) + # It shouldn't be necessary to move the thumbnails + # as they would be moved with the parent directory - async def move_file_metadata(self, - prev_fname: str, - new_fname: str, - move_thumbs: bool = True - ) -> bool: + def move_file_metadata( + self, prev_fname: str, new_fname: str + ) -> Union[bool, Awaitable]: metadata: Optional[Dict[str, Any]] metadata = self.metadata.pop(prev_fname, None) if metadata is None: @@ -2017,18 +2049,15 @@ class MetadataStorage: if self.metadata.pop(new_fname, None) is not None: self.mddb.pop(new_fname, None) return False + self.metadata[new_fname] = metadata self.mddb.move_batch([prev_fname], [new_fname]) - if move_thumbs: - eventloop = self.server.get_event_loop() - await eventloop.run_in_thread( - self._move_thumbnails, - [(prev_fname, new_fname, metadata)]) - return True + return self._move_thumbnails([(prev_fname, new_fname, metadata)]) - def _move_thumbnails(self, - records: List[Tuple[str, str, Dict[str, Any]]] - ) -> None: + async def _move_thumbnails( + self, records: List[Tuple[str, str, Dict[str, Any]]] + ) -> None: + eventloop = self.server.get_event_loop() for (prev_fname, new_fname, metadata) in records: prev_dir = os.path.dirname(os.path.join(self.gc_path, prev_fname)) new_dir = os.path.dirname(os.path.join(self.gc_path, new_fname)) @@ -2042,12 +2071,21 @@ class MetadataStorage: if not os.path.isfile(thumb_path): continue new_path = os.path.join(new_dir, path) + new_parent = os.path.dirname(new_path) try: - os.makedirs(os.path.dirname(new_path), exist_ok=True) - shutil.move(thumb_path, new_path) + if not os.path.exists(new_parent): + os.mkdir(new_parent) + # Wait for inotify to register the node before the move + await asyncio.sleep(.2) + await eventloop.run_in_thread( + shutil.move, thumb_path, new_path + ) + except asyncio.CancelledError: + raise except Exception: - logging.debug(f"Error moving thumb from {thumb_path}" - f" to {new_path}") + logging.exception( + f"Error moving thumb from {thumb_path} to {new_path}" + ) def parse_metadata(self, fname: str,