file_manager: add support for inotify event based monitoring

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2021-03-01 18:06:22 -05:00 committed by Eric Callahan
parent 5f9b0e9b86
commit fbb1fcf500
1 changed files with 377 additions and 20 deletions

View File

@ -13,11 +13,16 @@ import tempfile
from concurrent.futures import ThreadPoolExecutor
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.locks import Event
from inotify_simple import INotify
from inotify_simple import flags as iFlags
VALID_GCODE_EXTS = ['.gcode', '.g', '.gco', '.ufp', '.nc']
FULL_ACCESS_ROOTS = ["gcodes", "config"]
METADATA_SCRIPT = os.path.abspath(os.path.join(
os.path.dirname(__file__), "../../scripts/extract_metadata.py"))
WATCH_FLAGS = iFlags.CREATE | iFlags.DELETE | iFlags.MODIFY \
| iFlags.MOVED_TO | iFlags.MOVED_FROM | iFlags.ONLYDIR \
| iFlags.CLOSE_WRITE
UFP_MODEL_PATH = "/3D/model.gcode"
UFP_THUMB_PATH = "/Metadata/thumbnail.png"
@ -29,6 +34,8 @@ class FileManager:
database = self.server.load_component(config, "database")
gc_path = database.get_item("moonraker", "file_manager.gcode_path", "")
self.gcode_metadata = MetadataStorage(self.server, gc_path, database)
self.inotify_handler = INotifyHandler(config, self,
self.gcode_metadata)
self.fixed_path_args = {}
# Register file management endpoints
@ -123,11 +130,9 @@ class FileManager:
database["file_manager.gcode_path"] = path
# scan for metadata changes
self.gcode_metadata.update_gcode_path(path)
try:
self.get_file_list("gcodes")
except Exception:
logging.exception(
f"Unable to initialize gcode metadata")
if root in FULL_ACCESS_ROOTS:
# Refresh the file list and add watches
self.inotify_handler.add_root_watch(root, path)
return True
def get_sd_directory(self):
@ -139,6 +144,12 @@ class FileManager:
def get_fixed_path_args(self):
return dict(self.fixed_path_args)
def get_relative_path(self, root, full_path):
root_dir = self.file_paths.get(root, None)
if root_dir is None or not full_path.startswith(root_dir):
return ""
return os.path.relpath(full_path, start=root_dir)
def check_file_exists(self, root, filename):
root_dir = self.file_paths.get(root, "")
file_path = os.path.join(root_dir, filename)
@ -187,8 +198,6 @@ class FileManager:
# loaded by the virtual_sdcard
await self._handle_operation_check(dir_path)
shutil.rmtree(dir_path)
if root == "gcodes":
self.gcode_metadata.prune_metadata()
else:
try:
os.rmdir(dir_path)
@ -295,7 +304,7 @@ class FileManager:
full_path = os.path.join(path, fname)
if not os.path.exists(full_path):
continue
path_info = self._get_path_info(full_path)
path_info = self.get_path_info(full_path)
if os.path.isdir(full_path):
path_info['dirname'] = fname
flist['dirs'].append(path_info)
@ -308,12 +317,12 @@ class FileManager:
ext in VALID_GCODE_EXTS:
if ext == ".ufp":
try:
full_path = self._process_ufp_from_refresh(
full_path = self.process_ufp_from_refresh(
full_path)
except Exception:
logging.exception("Error processing ufp file")
continue
path_info = self._get_path_info(full_path)
path_info = self.get_path_info(full_path)
path_info['filename'] = os.path.split(full_path)[-1]
rel_path = os.path.relpath(full_path, start=gc_path)
self.gcode_metadata.parse_metadata(
@ -327,7 +336,7 @@ class FileManager:
flist['disk_usage'] = usage._asdict()
return flist
def _get_path_info(self, path):
def get_path_info(self, path):
modified = os.path.getmtime(path)
size = os.path.getsize(path)
path_info = {'modified': modified, 'size': size}
@ -420,7 +429,7 @@ class FileManager:
await ioloop.run_in_executor(
tpe, self._process_uploaded_file, upload_info)
# Fetch Metadata
finfo = self._get_path_info(upload_info['dest_path'])
finfo = self.get_path_info(upload_info['dest_path'])
evt = self.gcode_metadata.parse_metadata(
upload_info['filename'], finfo['size'], finfo['modified'])
await evt.wait()
@ -489,7 +498,7 @@ class FileManager:
except Exception:
logging.exception(f"Error removing ufp file: {ufp_path}")
def _process_ufp_from_refresh(self, ufp_path):
def process_ufp_from_refresh(self, ufp_path):
dest_path = os.path.splitext(ufp_path)[0] + ".gcode"
self._unzip_ufp(ufp_path, dest_path)
return dest_path
@ -524,16 +533,16 @@ class FileManager:
if root == 'gcodes' and ext not in VALID_GCODE_EXTS:
continue
full_path = os.path.join(dir_path, name)
if ext == ".ufp":
if root == 'gcodes' and ext == ".ufp":
try:
full_path = self._process_ufp_from_refresh(full_path)
full_path = self.process_ufp_from_refresh(full_path)
except Exception:
logging.exception("Error processing ufp file")
continue
if not os.path.exists(full_path):
continue
fname = full_path[len(path) + 1:]
finfo = self._get_path_info(full_path)
finfo = self.get_path_info(full_path)
filelist[fname] = finfo
if root == 'gcodes':
self.gcode_metadata.parse_metadata(
@ -612,7 +621,6 @@ class FileManager:
except self.server.error as e:
if e.status_code == 403:
raise
self.gcode_metadata.remove_file(filename)
os.remove(full_path)
self.notify_filelist_changed('delete_file', filename, root)
return filename
@ -627,9 +635,349 @@ class FileManager:
self.server.send_event("file_manager:filelist_changed", result)
def close(self):
self.inotify_handler.close()
self.gcode_metadata.close()
INOTIFY_DELETE_TIME = .25
INOTIFY_MOVE_TIME = 1.
class INotifyHandler:
def __init__(self, config, file_manager, gcode_metadata):
self.server = config.get_server()
self.debug_enabled = config['server'].getboolean(
'enable_debug_logging', False)
self.file_manager = file_manager
self.gcode_metadata = gcode_metadata
self.ioloop = IOLoop.current()
self.inotify = INotify(nonblocking=True)
self.ioloop.add_handler(
self.inotify.fileno(), self._handle_inotify_read,
IOLoop.READ | IOLoop.ERROR)
self.watches = {}
self.watched_dirs = {}
self.pending_move_events = {}
self.pending_create_events = {}
self.pending_modify_events = {}
self.pending_delete_events = {}
def add_root_watch(self, root, root_path):
# remove all exisiting watches on root
for (wroot, wdir) in list(self.watched_dirs.values()):
if root == wroot:
self.remove_watch(wdir)
# remove pending move notifications on root
for cookie, pending in list(self.pending_move_events.items()):
if root == pending[0]:
self.ioloop.remove_timeout(pending[2])
del self.pending_move_events[cookie]
# remove pending create notifications on root
for fpath, croot in list(self.pending_create_events.items()):
if root == croot:
del self.pending_create_events[fpath]
# remove pending modify notifications on root
for fpath, croot in list(self.pending_modify_events.items()):
if root == croot:
del self.pending_modify_events[fpath]
# remove pending delete notifications on root
for dir_path, pending in list(self.pending_delete_events.items()):
if root == pending[0]:
self.ioloop.remove_timeout(pending[2])
del self.pending_delete_events[dir_path]
self._scan_directory(root, root_path)
def add_watch(self, root, dir_path):
if dir_path in self.watches or \
root not in FULL_ACCESS_ROOTS:
return
watch = self.inotify.add_watch(dir_path, WATCH_FLAGS)
self.watches[dir_path] = watch
self.watched_dirs[watch] = (root, dir_path)
def remove_watch(self, dir_path, need_low_level_rm=True):
wd = self.watches.pop(dir_path)
self.watched_dirs.pop(wd)
if need_low_level_rm:
try:
self.inotify.rm_watch(wd)
except OSError:
logging.exception(f"Error removing watch: '{dir_path}'")
def _reset_watch(self, prev_path, new_root, new_path):
wd = self.watches.pop(prev_path, None)
if wd is not None:
self.watches[new_path] = wd
self.watched_dirs[wd] = (new_root, new_path)
def _process_deleted_files(self, dir_path):
if dir_path not in self.pending_delete_events:
return
root, files, hdl = self.pending_delete_events.pop(dir_path)
for fname in files:
file_path = os.path.join(dir_path, fname)
self._clear_metadata(root, file_path)
self._notify_filelist_changed(
"delete_file", root, file_path)
def _remove_stale_cookie(self, cookie):
pending_evt = self.pending_move_events.pop(cookie, None)
if pending_evt is None:
# Event already processed
return
prev_root, prev_path, hdl, is_dir = pending_evt
logging.debug("Inotify stale cookie removed: "
f"{prev_root}, {prev_path}")
item_type = "file"
if is_dir:
item_type = "dir"
self.remove_watch(prev_path)
self._notify_filelist_changed(
f"delete_{item_type}", prev_root, prev_path)
def _clear_metadata(self, root, path, is_dir=False):
if root == "gcodes":
rel_path = self.file_manager.get_relative_path(root, path)
if is_dir:
self.gcode_metadata.remove_directory_metadata(rel_path)
else:
self.gcode_metadata.remove_file_metadata(rel_path)
def _scan_directory(self, root, dir_path, moved_path=None):
# Walk through a directory. Create or reset watches as necessary
if moved_path is None:
self.add_watch(root, dir_path)
else:
self._reset_watch(moved_path, root, dir_path)
st = os.stat(dir_path)
visited_dirs = {(st.st_dev, st.st_ino)}
for dpath, dnames, files in os.walk(dir_path, followlinks=True):
scan_dirs = []
for dname in dnames:
full_path = os.path.join(dpath, dname)
st = os.stat(full_path)
key = (st.st_dev, st.st_ino)
if key not in visited_dirs:
# Don't watch "thumbs" directories in the gcodes root
if not (root == "gcodes" and dname == "thumbs"):
if moved_path is not None:
rel_path = os.path.relpath(
full_path, start=dir_path)
prev_path = os.path.join(moved_path, rel_path)
self._reset_watch(prev_path, root, full_path)
else:
self.add_watch(root, full_path)
visited_dirs.add(key)
scan_dirs.append(dname)
dnames[:] = scan_dirs
if root != "gcodes":
# No need check for metadata in other roots
continue
for name in files:
fpath = os.path.join(dpath, name)
ext = os.path.splitext(name)[-1].lower()
if name[0] == "." or ext not in VALID_GCODE_EXTS:
continue
if ext == ".ufp":
self.ioloop.spawn_callback(self._process_ufp, fpath)
else:
self._parse_gcode_metadata(fpath)
if self.debug_enabled:
debug_msg = f"Inotify Watches After Scan: {dir_path}"
for wdir, watch in self.watches.items():
wroot, wpath = self.watched_dirs[watch]
match = wdir == wpath
debug_msg += f"\nRoot: {wroot}, Directory: {wdir}, " \
f"Watch: {watch}, Dir Match: {match}"
logging.debug(debug_msg)
def _parse_gcode_metadata(self, file_path):
rel_path = self.file_manager.get_relative_path("gcodes", file_path)
if not rel_path:
logging.info(
f"File at path '{file_path}' is not in the gcode path"
", metadata extraction aborted")
return
path_info = self.file_manager.get_path_info(file_path)
self.gcode_metadata.parse_metadata(
rel_path, path_info['size'], path_info['modified'], notify=True)
async def _process_ufp(self, file_path):
with ThreadPoolExecutor(max_workers=1) as tpe:
await self.ioloop.run_in_executor(
tpe, self.file_manager.process_ufp_from_refresh,
file_path)
def _handle_inotify_read(self, fd, events):
if events & IOLoop.ERROR:
logging.info("INotify Read Error")
return
for evt in self.inotify.read(timeout=0):
if evt.mask & iFlags.IGNORED:
continue
if evt.wd not in self.watched_dirs:
flags = " ".join([str(f) for f in iFlags.from_mask(evt.mask)])
logging.info(
f"Error, inotify watch descriptor {evt.wd} "
f"not currently tracked: name: {evt.name}, "
f"flags: {flags}")
continue
root, watch_path = self.watched_dirs[evt.wd]
child_path = watch_path
if evt.name:
child_path = os.path.join(watch_path, evt.name)
if evt.mask & iFlags.ISDIR:
self._process_dir_event(evt, root, child_path)
else:
self._process_file_event(evt, root, child_path)
def _process_dir_event(self, evt, root, child_path):
if root == "gcodes" and evt.name == "thumbs":
# ignore changes to the thumbs directory
return
if evt.mask & iFlags.CREATE:
logging.debug(f"Inotify directory create: {root}, {evt.name}")
self._scan_directory(root, child_path)
self._notify_filelist_changed(
"create_dir", root, child_path)
elif evt.mask & iFlags.DELETE:
logging.debug(f"Inotify directory delete: {root}, {evt.name}")
self.remove_watch(child_path, need_low_level_rm=False)
pending_evt = self.pending_delete_events.pop(child_path, None)
if pending_evt is not None:
delete_hdl = pending_evt[2]
self.ioloop.remove_timeout(delete_hdl)
self._clear_metadata(root, child_path, True)
self._notify_filelist_changed(
"delete_dir", root, child_path)
elif evt.mask & iFlags.MOVED_FROM:
logging.debug(f"Inotify directory move from: {root}, {evt.name}")
hdl = self.ioloop.call_later(
INOTIFY_MOVE_TIME, self._remove_stale_cookie, evt.cookie)
self.pending_move_events[evt.cookie] = (
root, child_path, hdl, True)
self._clear_metadata(root, child_path, True)
elif evt.mask & iFlags.MOVED_TO:
logging.debug(f"Inotify directory move to: {root}, {evt.name}")
pending_evt = self.pending_move_events.pop(evt.cookie, None)
if pending_evt is not None:
# Moved from a currently watched directory
prev_root, prev_path, hdl, is_dir = pending_evt
if not is_dir:
logging.debug(
f"Cookie matched to a file: {pending_evt}")
return
self.ioloop.remove_timeout(hdl)
self._scan_directory(root, child_path, prev_path)
self._notify_filelist_changed(
"move_dir", root, child_path,
prev_root, prev_path)
else:
# Moved from an unwatched directory, for our
# purposes this is the same as creating a
# directory
self._scan_directory(root, child_path)
self._notify_filelist_changed(
"create_dir", root, child_path)
def _process_file_event(self, evt, root, child_path):
ext = os.path.splitext(evt.name)[-1]
if root == "gcodes" and ext not in VALID_GCODE_EXTS:
# Don't notify files with invalid gcode extensions
return
if evt.mask & iFlags.CREATE:
logging.debug(f"Inotify file create: {root}, {evt.name}")
self.pending_create_events[child_path] = root
elif evt.mask & iFlags.DELETE:
logging.debug(f"Inotify file delete: {root}, {evt.name}")
if root == "gcodes" and ext == ".ufp":
# Don't notify deleted ufp files
return
dir_path, fname = os.path.split(child_path)
files = set()
if dir_path in self.pending_delete_events:
root, files, delete_hdl = self.pending_delete_events[dir_path]
self.ioloop.remove_timeout(delete_hdl)
files.add(fname)
delete_hdl = self.ioloop.call_later(
INOTIFY_MOVE_TIME, self._process_deleted_files, dir_path)
self.pending_delete_events[dir_path] = (root, files, delete_hdl)
elif evt.mask & iFlags.MOVED_FROM:
logging.debug(f"Inotify file move from: {root}, {evt.name}")
hdl = self.ioloop.call_later(
INOTIFY_DELETE_TIME, self._remove_stale_cookie, evt.cookie)
self.pending_move_events[evt.cookie] = (
root, child_path, hdl, False)
self._clear_metadata(root, child_path)
elif evt.mask & iFlags.MOVED_TO:
logging.debug(f"Inotify file move to: {root}, {evt.name}")
if root == "gcodes":
if os.path.splitext(child_path)[-1] == ".ufp":
self.ioloop.spawn_callback(
self._process_ufp, child_path)
return
self._parse_gcode_metadata(child_path)
pending_evt = self.pending_move_events.pop(evt.cookie, None)
if pending_evt is not None:
# Moved from a currently watched directory
prev_root, prev_path, hdl, is_dir = pending_evt
if is_dir:
logging.debug(
f"Cookie matched to directory: {pending_evt}")
return
self._notify_filelist_changed(
"move_file", root, child_path,
prev_root, prev_path)
else:
self._notify_filelist_changed(
"create_file", root, child_path)
elif evt.mask & iFlags.MODIFY:
if child_path not in self.pending_create_events:
self.pending_modify_events[child_path] = root
elif evt.mask & iFlags.CLOSE_WRITE:
logging.debug(f"Inotify writable file closed: {child_path}")
# Only process files that have been created or modified
if child_path in self.pending_create_events:
del self.pending_create_events[child_path]
action = "create_file"
elif child_path in self.pending_modify_events:
del self.pending_modify_events[child_path]
action = "modify_file"
else:
# Some other event, ignore it
return
if root == "gcodes":
if os.path.splitext(child_path)[-1] == ".ufp":
self.ioloop.spawn_callback(
self._process_ufp, child_path)
return
self._parse_gcode_metadata(child_path)
self._notify_filelist_changed(action, root, child_path)
def _notify_filelist_changed(self, action, root, full_path,
source_root=None, source_path=None):
rel_path = self.file_manager.get_relative_path(root, full_path)
file_info = {'size': 0, 'modified': 0}
if os.path.exists(full_path):
file_info = self.file_manager.get_path_info(full_path)
file_info['path'] = rel_path
file_info['root'] = root
result = {'action': action, 'item': file_info}
if source_path is not None and source_root is not None:
src_rel_path = self.file_manager.get_relative_path(
source_root, source_path)
result['source_item'] = {'path': src_rel_path, 'root': source_root}
self.server.send_event("file_manager:filelist_changed", result)
def close(self):
self.ioloop.remove_handler(self.inotify.fileno())
for watch in self.watches.values():
try:
self.inotify.rm_watch(watch)
except OSError:
pass
METADATA_PRUNE_TIME = 600000
METADATA_NAMESPACE = "gcode_metadata"
METADATA_VERSION = 3
@ -654,6 +1002,8 @@ class MetadataStorage:
self.gc_path = gc_path
self.prune_cb = PeriodicCallback(
self.prune_metadata, METADATA_PRUNE_TIME)
if self.gc_path:
self.prune_cb.start()
def update_gcode_path(self, path):
if path == self.gc_path:
@ -675,8 +1025,8 @@ class MetadataStorage:
def prune_metadata(self):
for fname in list(self.mddb.keys()):
fpath = os.path.join(self.gc_path, fname)
if not os.path.exists(fpath):
self.remove_file(fname)
if not os.path.isfile(fpath):
self.remove_file_metadata(fname)
logging.info(f"Pruned file: {fname}")
continue
@ -684,7 +1034,12 @@ class MetadataStorage:
mdata = self.mddb.get(fname, {'size': "", 'modified': 0})
return mdata['size'] == fsize and mdata['modified'] == modified
def remove_file(self, fname):
def remove_directory_metadata(self, dir_name):
for fname in list(self.mddb.keys()):
if fname.startswith(dir_name):
self.remove_file_metadata(fname)
def remove_file_metadata(self, fname):
metadata = self.mddb.pop(fname, None)
if metadata is None:
return
@ -696,6 +1051,8 @@ class MetadataStorage:
if path is None:
continue
thumb_path = os.path.join(fdir, path)
if not os.path.isfile(thumb_path):
continue
try:
os.remove(thumb_path)
except Exception: