From fbb1fcf50046cb3820257795e7f485e1f10f71cc Mon Sep 17 00:00:00 2001 From: Arksine Date: Mon, 1 Mar 2021 18:06:22 -0500 Subject: [PATCH] file_manager: add support for inotify event based monitoring Signed-off-by: Eric Callahan --- moonraker/components/file_manager.py | 397 +++++++++++++++++++++++++-- 1 file changed, 377 insertions(+), 20 deletions(-) diff --git a/moonraker/components/file_manager.py b/moonraker/components/file_manager.py index 7834901..e8866a0 100644 --- a/moonraker/components/file_manager.py +++ b/moonraker/components/file_manager.py @@ -13,11 +13,16 @@ import tempfile from concurrent.futures import ThreadPoolExecutor from tornado.ioloop import IOLoop, PeriodicCallback from tornado.locks import Event +from inotify_simple import INotify +from inotify_simple import flags as iFlags VALID_GCODE_EXTS = ['.gcode', '.g', '.gco', '.ufp', '.nc'] FULL_ACCESS_ROOTS = ["gcodes", "config"] METADATA_SCRIPT = os.path.abspath(os.path.join( os.path.dirname(__file__), "../../scripts/extract_metadata.py")) +WATCH_FLAGS = iFlags.CREATE | iFlags.DELETE | iFlags.MODIFY \ + | iFlags.MOVED_TO | iFlags.MOVED_FROM | iFlags.ONLYDIR \ + | iFlags.CLOSE_WRITE UFP_MODEL_PATH = "/3D/model.gcode" UFP_THUMB_PATH = "/Metadata/thumbnail.png" @@ -29,6 +34,8 @@ class FileManager: database = self.server.load_component(config, "database") gc_path = database.get_item("moonraker", "file_manager.gcode_path", "") self.gcode_metadata = MetadataStorage(self.server, gc_path, database) + self.inotify_handler = INotifyHandler(config, self, + self.gcode_metadata) self.fixed_path_args = {} # Register file management endpoints @@ -123,11 +130,9 @@ class FileManager: database["file_manager.gcode_path"] = path # scan for metadata changes self.gcode_metadata.update_gcode_path(path) - try: - self.get_file_list("gcodes") - except Exception: - logging.exception( - f"Unable to initialize gcode metadata") + if root in FULL_ACCESS_ROOTS: + # Refresh the file list and add watches + self.inotify_handler.add_root_watch(root, path) return True def get_sd_directory(self): @@ -139,6 +144,12 @@ class FileManager: def get_fixed_path_args(self): return dict(self.fixed_path_args) + def get_relative_path(self, root, full_path): + root_dir = self.file_paths.get(root, None) + if root_dir is None or not full_path.startswith(root_dir): + return "" + return os.path.relpath(full_path, start=root_dir) + def check_file_exists(self, root, filename): root_dir = self.file_paths.get(root, "") file_path = os.path.join(root_dir, filename) @@ -187,8 +198,6 @@ class FileManager: # loaded by the virtual_sdcard await self._handle_operation_check(dir_path) shutil.rmtree(dir_path) - if root == "gcodes": - self.gcode_metadata.prune_metadata() else: try: os.rmdir(dir_path) @@ -295,7 +304,7 @@ class FileManager: full_path = os.path.join(path, fname) if not os.path.exists(full_path): continue - path_info = self._get_path_info(full_path) + path_info = self.get_path_info(full_path) if os.path.isdir(full_path): path_info['dirname'] = fname flist['dirs'].append(path_info) @@ -308,12 +317,12 @@ class FileManager: ext in VALID_GCODE_EXTS: if ext == ".ufp": try: - full_path = self._process_ufp_from_refresh( + full_path = self.process_ufp_from_refresh( full_path) except Exception: logging.exception("Error processing ufp file") continue - path_info = self._get_path_info(full_path) + path_info = self.get_path_info(full_path) path_info['filename'] = os.path.split(full_path)[-1] rel_path = os.path.relpath(full_path, start=gc_path) self.gcode_metadata.parse_metadata( @@ -327,7 +336,7 @@ class FileManager: flist['disk_usage'] = usage._asdict() return flist - def _get_path_info(self, path): + def get_path_info(self, path): modified = os.path.getmtime(path) size = os.path.getsize(path) path_info = {'modified': modified, 'size': size} @@ -420,7 +429,7 @@ class FileManager: await ioloop.run_in_executor( tpe, self._process_uploaded_file, upload_info) # Fetch Metadata - finfo = self._get_path_info(upload_info['dest_path']) + finfo = self.get_path_info(upload_info['dest_path']) evt = self.gcode_metadata.parse_metadata( upload_info['filename'], finfo['size'], finfo['modified']) await evt.wait() @@ -489,7 +498,7 @@ class FileManager: except Exception: logging.exception(f"Error removing ufp file: {ufp_path}") - def _process_ufp_from_refresh(self, ufp_path): + def process_ufp_from_refresh(self, ufp_path): dest_path = os.path.splitext(ufp_path)[0] + ".gcode" self._unzip_ufp(ufp_path, dest_path) return dest_path @@ -524,16 +533,16 @@ class FileManager: if root == 'gcodes' and ext not in VALID_GCODE_EXTS: continue full_path = os.path.join(dir_path, name) - if ext == ".ufp": + if root == 'gcodes' and ext == ".ufp": try: - full_path = self._process_ufp_from_refresh(full_path) + full_path = self.process_ufp_from_refresh(full_path) except Exception: logging.exception("Error processing ufp file") continue if not os.path.exists(full_path): continue fname = full_path[len(path) + 1:] - finfo = self._get_path_info(full_path) + finfo = self.get_path_info(full_path) filelist[fname] = finfo if root == 'gcodes': self.gcode_metadata.parse_metadata( @@ -612,7 +621,6 @@ class FileManager: except self.server.error as e: if e.status_code == 403: raise - self.gcode_metadata.remove_file(filename) os.remove(full_path) self.notify_filelist_changed('delete_file', filename, root) return filename @@ -627,9 +635,349 @@ class FileManager: self.server.send_event("file_manager:filelist_changed", result) def close(self): + self.inotify_handler.close() self.gcode_metadata.close() +INOTIFY_DELETE_TIME = .25 +INOTIFY_MOVE_TIME = 1. + +class INotifyHandler: + def __init__(self, config, file_manager, gcode_metadata): + self.server = config.get_server() + self.debug_enabled = config['server'].getboolean( + 'enable_debug_logging', False) + self.file_manager = file_manager + self.gcode_metadata = gcode_metadata + self.ioloop = IOLoop.current() + self.inotify = INotify(nonblocking=True) + self.ioloop.add_handler( + self.inotify.fileno(), self._handle_inotify_read, + IOLoop.READ | IOLoop.ERROR) + + self.watches = {} + self.watched_dirs = {} + self.pending_move_events = {} + self.pending_create_events = {} + self.pending_modify_events = {} + self.pending_delete_events = {} + + def add_root_watch(self, root, root_path): + # remove all exisiting watches on root + for (wroot, wdir) in list(self.watched_dirs.values()): + if root == wroot: + self.remove_watch(wdir) + # remove pending move notifications on root + for cookie, pending in list(self.pending_move_events.items()): + if root == pending[0]: + self.ioloop.remove_timeout(pending[2]) + del self.pending_move_events[cookie] + # remove pending create notifications on root + for fpath, croot in list(self.pending_create_events.items()): + if root == croot: + del self.pending_create_events[fpath] + # remove pending modify notifications on root + for fpath, croot in list(self.pending_modify_events.items()): + if root == croot: + del self.pending_modify_events[fpath] + # remove pending delete notifications on root + for dir_path, pending in list(self.pending_delete_events.items()): + if root == pending[0]: + self.ioloop.remove_timeout(pending[2]) + del self.pending_delete_events[dir_path] + self._scan_directory(root, root_path) + + def add_watch(self, root, dir_path): + if dir_path in self.watches or \ + root not in FULL_ACCESS_ROOTS: + return + watch = self.inotify.add_watch(dir_path, WATCH_FLAGS) + self.watches[dir_path] = watch + self.watched_dirs[watch] = (root, dir_path) + + def remove_watch(self, dir_path, need_low_level_rm=True): + wd = self.watches.pop(dir_path) + self.watched_dirs.pop(wd) + if need_low_level_rm: + try: + self.inotify.rm_watch(wd) + except OSError: + logging.exception(f"Error removing watch: '{dir_path}'") + + def _reset_watch(self, prev_path, new_root, new_path): + wd = self.watches.pop(prev_path, None) + if wd is not None: + self.watches[new_path] = wd + self.watched_dirs[wd] = (new_root, new_path) + + def _process_deleted_files(self, dir_path): + if dir_path not in self.pending_delete_events: + return + root, files, hdl = self.pending_delete_events.pop(dir_path) + for fname in files: + file_path = os.path.join(dir_path, fname) + self._clear_metadata(root, file_path) + self._notify_filelist_changed( + "delete_file", root, file_path) + + def _remove_stale_cookie(self, cookie): + pending_evt = self.pending_move_events.pop(cookie, None) + if pending_evt is None: + # Event already processed + return + prev_root, prev_path, hdl, is_dir = pending_evt + logging.debug("Inotify stale cookie removed: " + f"{prev_root}, {prev_path}") + item_type = "file" + if is_dir: + item_type = "dir" + self.remove_watch(prev_path) + self._notify_filelist_changed( + f"delete_{item_type}", prev_root, prev_path) + + def _clear_metadata(self, root, path, is_dir=False): + if root == "gcodes": + rel_path = self.file_manager.get_relative_path(root, path) + if is_dir: + self.gcode_metadata.remove_directory_metadata(rel_path) + else: + self.gcode_metadata.remove_file_metadata(rel_path) + + def _scan_directory(self, root, dir_path, moved_path=None): + # Walk through a directory. Create or reset watches as necessary + if moved_path is None: + self.add_watch(root, dir_path) + else: + self._reset_watch(moved_path, root, dir_path) + st = os.stat(dir_path) + visited_dirs = {(st.st_dev, st.st_ino)} + for dpath, dnames, files in os.walk(dir_path, followlinks=True): + scan_dirs = [] + for dname in dnames: + full_path = os.path.join(dpath, dname) + st = os.stat(full_path) + key = (st.st_dev, st.st_ino) + if key not in visited_dirs: + # Don't watch "thumbs" directories in the gcodes root + if not (root == "gcodes" and dname == "thumbs"): + if moved_path is not None: + rel_path = os.path.relpath( + full_path, start=dir_path) + prev_path = os.path.join(moved_path, rel_path) + self._reset_watch(prev_path, root, full_path) + else: + self.add_watch(root, full_path) + visited_dirs.add(key) + scan_dirs.append(dname) + dnames[:] = scan_dirs + if root != "gcodes": + # No need check for metadata in other roots + continue + for name in files: + fpath = os.path.join(dpath, name) + ext = os.path.splitext(name)[-1].lower() + if name[0] == "." or ext not in VALID_GCODE_EXTS: + continue + if ext == ".ufp": + self.ioloop.spawn_callback(self._process_ufp, fpath) + else: + self._parse_gcode_metadata(fpath) + if self.debug_enabled: + debug_msg = f"Inotify Watches After Scan: {dir_path}" + for wdir, watch in self.watches.items(): + wroot, wpath = self.watched_dirs[watch] + match = wdir == wpath + debug_msg += f"\nRoot: {wroot}, Directory: {wdir}, " \ + f"Watch: {watch}, Dir Match: {match}" + logging.debug(debug_msg) + + def _parse_gcode_metadata(self, file_path): + rel_path = self.file_manager.get_relative_path("gcodes", file_path) + if not rel_path: + logging.info( + f"File at path '{file_path}' is not in the gcode path" + ", metadata extraction aborted") + return + path_info = self.file_manager.get_path_info(file_path) + self.gcode_metadata.parse_metadata( + rel_path, path_info['size'], path_info['modified'], notify=True) + + async def _process_ufp(self, file_path): + with ThreadPoolExecutor(max_workers=1) as tpe: + await self.ioloop.run_in_executor( + tpe, self.file_manager.process_ufp_from_refresh, + file_path) + + def _handle_inotify_read(self, fd, events): + if events & IOLoop.ERROR: + logging.info("INotify Read Error") + return + for evt in self.inotify.read(timeout=0): + if evt.mask & iFlags.IGNORED: + continue + if evt.wd not in self.watched_dirs: + flags = " ".join([str(f) for f in iFlags.from_mask(evt.mask)]) + logging.info( + f"Error, inotify watch descriptor {evt.wd} " + f"not currently tracked: name: {evt.name}, " + f"flags: {flags}") + continue + root, watch_path = self.watched_dirs[evt.wd] + child_path = watch_path + if evt.name: + child_path = os.path.join(watch_path, evt.name) + if evt.mask & iFlags.ISDIR: + self._process_dir_event(evt, root, child_path) + else: + self._process_file_event(evt, root, child_path) + + def _process_dir_event(self, evt, root, child_path): + if root == "gcodes" and evt.name == "thumbs": + # ignore changes to the thumbs directory + return + if evt.mask & iFlags.CREATE: + logging.debug(f"Inotify directory create: {root}, {evt.name}") + self._scan_directory(root, child_path) + self._notify_filelist_changed( + "create_dir", root, child_path) + elif evt.mask & iFlags.DELETE: + logging.debug(f"Inotify directory delete: {root}, {evt.name}") + self.remove_watch(child_path, need_low_level_rm=False) + pending_evt = self.pending_delete_events.pop(child_path, None) + if pending_evt is not None: + delete_hdl = pending_evt[2] + self.ioloop.remove_timeout(delete_hdl) + self._clear_metadata(root, child_path, True) + self._notify_filelist_changed( + "delete_dir", root, child_path) + elif evt.mask & iFlags.MOVED_FROM: + logging.debug(f"Inotify directory move from: {root}, {evt.name}") + hdl = self.ioloop.call_later( + INOTIFY_MOVE_TIME, self._remove_stale_cookie, evt.cookie) + self.pending_move_events[evt.cookie] = ( + root, child_path, hdl, True) + self._clear_metadata(root, child_path, True) + elif evt.mask & iFlags.MOVED_TO: + logging.debug(f"Inotify directory move to: {root}, {evt.name}") + pending_evt = self.pending_move_events.pop(evt.cookie, None) + if pending_evt is not None: + # Moved from a currently watched directory + prev_root, prev_path, hdl, is_dir = pending_evt + if not is_dir: + logging.debug( + f"Cookie matched to a file: {pending_evt}") + return + self.ioloop.remove_timeout(hdl) + self._scan_directory(root, child_path, prev_path) + self._notify_filelist_changed( + "move_dir", root, child_path, + prev_root, prev_path) + else: + # Moved from an unwatched directory, for our + # purposes this is the same as creating a + # directory + self._scan_directory(root, child_path) + self._notify_filelist_changed( + "create_dir", root, child_path) + + def _process_file_event(self, evt, root, child_path): + ext = os.path.splitext(evt.name)[-1] + if root == "gcodes" and ext not in VALID_GCODE_EXTS: + # Don't notify files with invalid gcode extensions + return + if evt.mask & iFlags.CREATE: + logging.debug(f"Inotify file create: {root}, {evt.name}") + self.pending_create_events[child_path] = root + elif evt.mask & iFlags.DELETE: + logging.debug(f"Inotify file delete: {root}, {evt.name}") + if root == "gcodes" and ext == ".ufp": + # Don't notify deleted ufp files + return + dir_path, fname = os.path.split(child_path) + files = set() + if dir_path in self.pending_delete_events: + root, files, delete_hdl = self.pending_delete_events[dir_path] + self.ioloop.remove_timeout(delete_hdl) + files.add(fname) + delete_hdl = self.ioloop.call_later( + INOTIFY_MOVE_TIME, self._process_deleted_files, dir_path) + self.pending_delete_events[dir_path] = (root, files, delete_hdl) + elif evt.mask & iFlags.MOVED_FROM: + logging.debug(f"Inotify file move from: {root}, {evt.name}") + hdl = self.ioloop.call_later( + INOTIFY_DELETE_TIME, self._remove_stale_cookie, evt.cookie) + self.pending_move_events[evt.cookie] = ( + root, child_path, hdl, False) + self._clear_metadata(root, child_path) + elif evt.mask & iFlags.MOVED_TO: + logging.debug(f"Inotify file move to: {root}, {evt.name}") + if root == "gcodes": + if os.path.splitext(child_path)[-1] == ".ufp": + self.ioloop.spawn_callback( + self._process_ufp, child_path) + return + self._parse_gcode_metadata(child_path) + pending_evt = self.pending_move_events.pop(evt.cookie, None) + if pending_evt is not None: + # Moved from a currently watched directory + prev_root, prev_path, hdl, is_dir = pending_evt + if is_dir: + logging.debug( + f"Cookie matched to directory: {pending_evt}") + return + self._notify_filelist_changed( + "move_file", root, child_path, + prev_root, prev_path) + else: + self._notify_filelist_changed( + "create_file", root, child_path) + elif evt.mask & iFlags.MODIFY: + if child_path not in self.pending_create_events: + self.pending_modify_events[child_path] = root + elif evt.mask & iFlags.CLOSE_WRITE: + logging.debug(f"Inotify writable file closed: {child_path}") + # Only process files that have been created or modified + if child_path in self.pending_create_events: + del self.pending_create_events[child_path] + action = "create_file" + elif child_path in self.pending_modify_events: + del self.pending_modify_events[child_path] + action = "modify_file" + else: + # Some other event, ignore it + return + if root == "gcodes": + if os.path.splitext(child_path)[-1] == ".ufp": + self.ioloop.spawn_callback( + self._process_ufp, child_path) + return + self._parse_gcode_metadata(child_path) + self._notify_filelist_changed(action, root, child_path) + + def _notify_filelist_changed(self, action, root, full_path, + source_root=None, source_path=None): + rel_path = self.file_manager.get_relative_path(root, full_path) + file_info = {'size': 0, 'modified': 0} + if os.path.exists(full_path): + file_info = self.file_manager.get_path_info(full_path) + file_info['path'] = rel_path + file_info['root'] = root + result = {'action': action, 'item': file_info} + if source_path is not None and source_root is not None: + src_rel_path = self.file_manager.get_relative_path( + source_root, source_path) + result['source_item'] = {'path': src_rel_path, 'root': source_root} + self.server.send_event("file_manager:filelist_changed", result) + + def close(self): + self.ioloop.remove_handler(self.inotify.fileno()) + for watch in self.watches.values(): + try: + self.inotify.rm_watch(watch) + except OSError: + pass + + METADATA_PRUNE_TIME = 600000 METADATA_NAMESPACE = "gcode_metadata" METADATA_VERSION = 3 @@ -654,6 +1002,8 @@ class MetadataStorage: self.gc_path = gc_path self.prune_cb = PeriodicCallback( self.prune_metadata, METADATA_PRUNE_TIME) + if self.gc_path: + self.prune_cb.start() def update_gcode_path(self, path): if path == self.gc_path: @@ -675,8 +1025,8 @@ class MetadataStorage: def prune_metadata(self): for fname in list(self.mddb.keys()): fpath = os.path.join(self.gc_path, fname) - if not os.path.exists(fpath): - self.remove_file(fname) + if not os.path.isfile(fpath): + self.remove_file_metadata(fname) logging.info(f"Pruned file: {fname}") continue @@ -684,7 +1034,12 @@ class MetadataStorage: mdata = self.mddb.get(fname, {'size': "", 'modified': 0}) return mdata['size'] == fsize and mdata['modified'] == modified - def remove_file(self, fname): + def remove_directory_metadata(self, dir_name): + for fname in list(self.mddb.keys()): + if fname.startswith(dir_name): + self.remove_file_metadata(fname) + + def remove_file_metadata(self, fname): metadata = self.mddb.pop(fname, None) if metadata is None: return @@ -696,6 +1051,8 @@ class MetadataStorage: if path is None: continue thumb_path = os.path.join(fdir, path) + if not os.path.isfile(thumb_path): + continue try: os.remove(thumb_path) except Exception: