diff --git a/moonraker/components/file_manager.py b/moonraker/components/file_manager.py index 96a6bbb..6fbff36 100644 --- a/moonraker/components/file_manager.py +++ b/moonraker/components/file_manager.py @@ -9,6 +9,7 @@ import shutil import logging import json import tempfile +import asyncio from concurrent.futures import ThreadPoolExecutor from tornado.ioloop import IOLoop from tornado.locks import Event @@ -396,9 +397,8 @@ class FileManager: # Don't start if another print is currently in progress start_print = start_print and not print_ongoing finfo = self._process_uploaded_file(upload_info) - evt = self.gcode_metadata.parse_metadata( - upload_info['filename'], finfo) - await evt.wait() + await self.gcode_metadata.parse_metadata( + upload_info['filename'], finfo).wait() if start_print: # Make a Klippy Request to "Start Print" klippy_apis = self.server.lookup_component('klippy_apis') @@ -564,7 +564,7 @@ class InotifyNode: self.pending_deleted_children = set() self.pending_file_events = {} - def _finish_create_node(self): + async def _finish_create_node(self): # Finish a node's creation. All children that were created # with this node (ie: a directory is copied) are bundled into # this notification. We also scan the node to extract metadata @@ -575,7 +575,10 @@ class InotifyNode: node_path = self.get_path() root = self.get_root() # Scan child nodes for unwatched directories and metadata - self.scan_node(notify_created=False) + mevts = self.scan_node(notify_created=False) + if mevts: + mfuts = [e.wait() for e in mevts] + await asyncio.gather(*mfuts) self.ihdlr.log_nodes() self.ihdlr.notify_filelist_changed( "create_dir", root, node_path) @@ -603,7 +606,8 @@ class InotifyNode: dir_path = self.get_path() st = os.stat(dir_path) if st in visited_dirs: - return + return [] + metadata_events = [] visited_dirs.add((st.st_dev, st.st_ino)) for fname in os.listdir(dir_path): if fname[0] == ".": @@ -612,13 +616,16 @@ class InotifyNode: ext = os.path.splitext(fname)[-1].lower() if os.path.isdir(item_path): new_child = self.create_child_node(fname, notify_created) - new_child.scan_node(notify_created, visited_dirs) + metadata_events.extend(new_child.scan_node( + notify_created, visited_dirs)) elif os.path.isfile(item_path) and \ self.get_root() == "gcodes" and \ ext in VALID_GCODE_EXTS: - self.ihdlr.parse_gcode_metadata(item_path) + mevt = self.ihdlr.parse_gcode_metadata(item_path) + metadata_events.append(mevt) + return metadata_events - def move_child_node(self, child_name, new_name, new_parent): + async def move_child_node(self, child_name, new_name, new_parent): child_node = self.pop_child_node(child_name) if child_node is None: logging.info(f"No child for node at path: {self.get_path()}") @@ -632,7 +639,10 @@ class InotifyNode: logging.debug(f"Moving node from '{prev_path}' to '{new_path}'") # TODO: It is possible to "move" metadata rather # than rescan. - child_node.scan_node(notify_created=False) + mevts = child_node.scan_node(notify_created=False) + if mevts: + mfuts = [e.wait() for e in mevts] + await asyncio.gather(*mfuts) self.ihdlr.notify_filelist_changed( "move_dir", new_root, new_path, prev_root, prev_path) @@ -645,7 +655,7 @@ class InotifyNode: pending_node.stop_event("create_node") self.pending_file_events[file_name] = evt_name - def complete_file_write(self, file_name): + async def complete_file_write(self, file_name): evt_name = self.pending_file_events.pop(file_name, None) if evt_name is None: logging.info(f"Invalid file write event: {file_name}") @@ -659,10 +669,11 @@ class InotifyNode: file_path = os.path.join(self.get_path(), file_name) root = self.get_root() if root == "gcodes": - self.ihdlr.parse_gcode_metadata(file_path) + mevt = self.ihdlr.parse_gcode_metadata(file_path) if os.path.splitext(file_path)[1].lower() == ".ufp": # don't notify .ufp files return + await mevt.wait() self.ihdlr.notify_filelist_changed(evt_name, root, file_path) def add_child_node(self, node): @@ -779,6 +790,9 @@ class INotifyHandler: self.inotify.fileno(), self._handle_inotify_read, IOLoop.READ | IOLoop.ERROR) + self.event_loop_busy = False + self.pending_inotify_events = [] + self.watched_roots = {} self.watched_nodes = {} self.pending_moves = {} @@ -838,13 +852,16 @@ class INotifyHandler: logging.info( f"File at path '{file_path}' is not in the gcode path" ", metadata extraction aborted") - return + mevt = Event() + mevt.set() + return mevt path_info = self.file_manager.get_path_info(file_path) ext = os.path.splitext(file_path)[-1].lower() if ext == ".ufp": rel_path = os.path.splitext(rel_path)[0] + ".gcode" path_info['ufp_path'] = file_path - self.gcode_metadata.parse_metadata(rel_path, path_info, notify=True) + return self.gcode_metadata.parse_metadata( + rel_path, path_info, notify=True) def _handle_move_timeout(self, cookie, is_dir): if cookie not in self.pending_moves: @@ -887,13 +904,22 @@ class INotifyHandler: f"not currently tracked: name: {evt.name}, " f"flags: {flags}") continue + self.pending_inotify_events.append(evt) + if not self.event_loop_busy: + self.event_loop_busy = True + IOLoop.current().spawn_callback(self._process_inotify_events) + + async def _process_inotify_events(self): + while self.pending_inotify_events: + evt = self.pending_inotify_events.pop(0) node = self.watched_nodes[evt.wd] if evt.mask & iFlags.ISDIR: - self._process_dir_event(evt, node) + await self._process_dir_event(evt, node) else: - self._process_file_event(evt, node) + await self._process_file_event(evt, node) + self.event_loop_busy = False - def _process_dir_event(self, evt, node): + async def _process_dir_event(self, evt, node): if evt.name and evt.name[0] == ".": # ignore changes to the hidden directories return @@ -919,14 +945,14 @@ class INotifyHandler: # Moved from a currently watched directory prev_parent, child_name, hdl = moved_evt IOLoop.current().remove_timeout(hdl) - prev_parent.move_child_node(child_name, evt.name, node) + await prev_parent.move_child_node(child_name, evt.name, node) else: # Moved from an unwatched directory, for our # purposes this is the same as creating a # directory node.create_child_node(evt.name) - def _process_file_event(self, evt, node): + async def _process_file_event(self, evt, node): ext = os.path.splitext(evt.name)[-1].lower() root = node.get_root() node_path = node.get_path() @@ -953,7 +979,8 @@ class INotifyHandler: f"{node_path}, {evt.name}") file_path = os.path.join(node_path, evt.name) if root == "gcodes": - self.parse_gcode_metadata(file_path) + mevt = self.parse_gcode_metadata(file_path) + await mevt.wait() moved_evt = self.pending_moves.pop(evt.cookie, None) if moved_evt is not None: # Moved from a currently watched directory @@ -973,7 +1000,7 @@ class INotifyHandler: file_path = os.path.join(node_path, evt.name) logging.debug(f"Inotify writable file closed: {file_path}") # Only process files that have been created or modified - node.complete_file_write(evt.name) + await node.complete_file_write(evt.name) def notify_filelist_changed(self, action, root, full_path, source_root=None, source_path=None): @@ -1079,25 +1106,25 @@ class MetadataStorage: logging.debug(f"Error removing thumb at {thumb_path}") def parse_metadata(self, fname, path_info, notify=False): - evt = Event() + mevt = Event() if fname in self.pending_requests or \ self._has_valid_data(fname, path_info): # request already pending or not necessary - evt.set() - return evt - self.pending_requests[fname] = (path_info, notify, evt) + mevt.set() + return mevt + self.pending_requests[fname] = (path_info, notify, mevt) if self.busy: - return evt + return mevt self.busy = True IOLoop.current().spawn_callback(self._process_metadata_update) - return evt + return mevt async def _process_metadata_update(self): while self.pending_requests: - fname, (path_info, notify, evt) = \ + fname, (path_info, notify, mevt) = \ self.pending_requests.popitem() if self._has_valid_data(fname, path_info): - evt.set() + mevt.set() continue ufp_path = path_info.get('ufp_path', None) retries = 3 @@ -1119,7 +1146,7 @@ class MetadataStorage: } logging.info( f"Unable to extract medatadata from file: {fname}") - evt.set() + mevt.set() self.busy = False async def _run_extract_metadata(self, filename, ufp_path, notify):