file_manager: process metadata changes when no observer is configured

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-02-20 12:46:11 -05:00
parent 347c9f4d2b
commit 0f7b781f57
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
1 changed files with 194 additions and 62 deletions

View File

@ -93,7 +93,6 @@ class FileManager:
)
obs_class = BaseFileSystemObserver
logging.info(f"Using File System Observer: {observer}")
self.no_observe = observer == "none"
self.fs_observer = obs_class(
config, self, self.gcode_metadata, self.sync_lock
)
@ -431,6 +430,7 @@ class FileManager:
os.mkdir(dir_path)
except Exception as e:
raise self.server.error(str(e))
self.fs_observer.on_item_create(root, dir_path, is_dir=True)
elif method == 'DELETE' and root in self.full_access_roots:
# Remove a directory
action = "delete_dir"
@ -456,6 +456,7 @@ class FileManager:
os.rmdir(dir_path)
except Exception as e:
raise self.server.error(str(e))
self.fs_observer.on_item_delete(root, dir_path, is_dir=True)
else:
raise self.server.error("Operation Not Supported", 405)
return self._sched_changed_event(action, root, dir_path)
@ -545,10 +546,18 @@ class FileManager:
try:
full_dest = await self.event_loop.run_in_thread(
op_func, source_path, dest_path)
if dest_root == "gcodes":
if dest_root == "gcodes" and self.fs_observer.has_fast_observe:
await self.sync_lock.wait_inotify_event(full_dest)
except Exception as e:
raise self.server.error(str(e)) from e
if action.startswith("move"):
ret = self.fs_observer.on_item_move(
source_root, dest_root, source_path, full_dest
)
else:
ret = self.fs_observer.on_item_copy(dest_root, full_dest)
if ret is not None:
await ret
return self._sched_changed_event(
action, dest_root, full_dest, src_info[0], src_info[1]
)
@ -592,6 +601,7 @@ class FileManager:
await self.event_loop.run_in_thread(
self._zip_files, items, dest_path, store_only
)
self.fs_observer.on_item_create(dest_root, dest_path)
ret = self._sched_changed_event("create_file", dest_root, str(dest_path))
return {
"destination": ret["item"],
@ -831,6 +841,7 @@ class FileManager:
await job_queue.queue_job(
upload_info['filename'], check_exists=False)
queued = True
self.fs_observer.on_item_create("gcodes", upload_info["dest_path"])
result = dict(self._sched_changed_event(
"create_file", "gcodes", upload_info["dest_path"]
))
@ -843,6 +854,7 @@ class FileManager:
await self._process_uploaded_file(upload_info)
dest_path: str = upload_info["dest_path"]
root: str = upload_info["root"]
self.fs_observer.on_item_create(root, dest_path)
return self._sched_changed_event("create_file", root, dest_path)
async def _process_uploaded_file(self,
@ -988,6 +1000,7 @@ class FileManager:
raise
self.sync_lock.setup("delete_file", full_path)
os.remove(full_path)
self.fs_observer.on_item_delete(root, full_path)
return self._sched_changed_event("delete_file", root, full_path)
def _sched_changed_event(
@ -1009,7 +1022,7 @@ class FileManager:
if source_path is not None and source_root is not None:
src_rel_path = self.get_relative_path(source_root, source_path)
notify_info['source_item'] = {'path': src_rel_path, 'root': source_root}
immediate |= self.no_observe
immediate |= not self.fs_observer.has_fast_observe
delay = .005 if immediate else 1.
key = f"{action}-{root}-{rel_path}"
handle = self.event_loop.delay_callback(
@ -1200,6 +1213,10 @@ class BaseFileSystemObserver:
self.gcode_metadata = gcode_metadata
self.sync_lock = sync_lock
@property
def has_fast_observe(self) -> bool:
return False
def initialize(self) -> None:
pass
@ -1209,6 +1226,152 @@ class BaseFileSystemObserver:
fm = self.file_manager
fm._sched_changed_event("root_update", root, root_path, immediate=True)
def try_move_metadata(
self,
prev_root: str,
new_root: str,
prev_path: str,
new_path: str,
is_dir: bool = False
) -> Union[bool, Awaitable]:
if new_root == "gcodes":
if prev_root == "gcodes":
# moved within the gcodes root, move metadata
fm = self.file_manager
gcm = self.gcode_metadata
prev_rel_path = fm.get_relative_path("gcodes", prev_path)
new_rel_path = fm.get_relative_path("gcodes", new_path)
if is_dir:
gcm.move_directory_metadata(prev_rel_path, new_rel_path)
else:
return gcm.move_file_metadata(prev_rel_path, new_rel_path)
else:
# move from a non-gcodes root to gcodes root needs a rescan
self.clear_metadata(prev_root, prev_path, is_dir)
return False
elif prev_root == "gcodes":
# moved out of the gcodes root, remove metadata
self.clear_metadata(prev_root, prev_path, is_dir)
return True
def clear_metadata(
self, root: str, path: str, is_dir: bool = False
) -> None:
if root == "gcodes":
rel_path = self.file_manager.get_relative_path(root, str(path))
if is_dir:
self.gcode_metadata.remove_directory_metadata(rel_path)
else:
self.gcode_metadata.remove_file_metadata(rel_path)
def parse_gcode_metadata(self, file_path: str) -> asyncio.Event:
rel_path = self.file_manager.get_relative_path("gcodes", file_path)
ext = os.path.splitext(rel_path)[-1].lower()
try:
path_info = self.file_manager.get_path_info(file_path, "gcodes")
except Exception:
path_info = {}
if (
ext not in VALID_GCODE_EXTS or
path_info.get('size', 0) == 0
):
evt = asyncio.Event()
evt.set()
return evt
if ext == ".ufp":
rel_path = os.path.splitext(rel_path)[0] + ".gcode"
path_info['ufp_path'] = file_path
return self.gcode_metadata.parse_metadata(rel_path, path_info)
def _scan_directory_metadata(
self, start_path: pathlib.Path
) -> Optional[Awaitable]:
# Use os.walk find files in sd path and subdirs
mevts: List[Coroutine] = []
st = start_path.stat()
visited_dirs = {(st.st_dev, st.st_ino)}
for parent, dirs, files in os.walk(start_path, followlinks=True):
scan_dirs: List[str] = []
# Filter out directories that have already been visted. This
# prevents infinite recrusion "followlinks" is set to True
parent_dir = pathlib.Path(parent)
for dname in dirs:
dir_path = parent_dir.joinpath(dname)
if not dir_path.exists():
continue
st = dir_path.stat()
key = (st.st_dev, st.st_ino)
if key not in visited_dirs:
visited_dirs.add(key)
scan_dirs.append(dname)
dirs[:] = scan_dirs
for fname in files:
file_path = parent_dir.joinpath(fname)
if (
not file_path.is_file() or
file_path.suffix not in VALID_GCODE_EXTS
):
continue
mevt = self.parse_gcode_metadata(str(file_path))
mevts.append(mevt.wait())
if mevts:
return asyncio.gather(*mevts)
return None
def on_item_copy(self, root: str, item_path: StrOrPath) -> Optional[Awaitable]:
if self.has_fast_observe:
return None
if isinstance(item_path, str):
item_path = pathlib.Path(item_path)
if root != "gcodes":
return None
if item_path.is_file() and item_path.suffix in VALID_GCODE_EXTS:
ret = self.parse_gcode_metadata(str(item_path))
return ret.wait()
elif item_path.is_dir():
return self._scan_directory_metadata(item_path)
return None
def on_item_move(
self,
src_root: str,
dest_root: str,
src_path: StrOrPath,
dest_path: StrOrPath
) -> Optional[Awaitable]:
if self.has_fast_observe:
return None
if isinstance(src_path, str):
src_path = pathlib.Path(src_path)
if isinstance(dest_path, str):
dest_path = pathlib.Path(dest_path)
is_dir = dest_path.is_dir()
ret = self.try_move_metadata(
src_root, dest_root, str(src_path), str(dest_path), is_dir
)
if not isinstance(ret, bool):
return ret
elif ret is False:
# Need metadata scan
if is_dir:
return self._scan_directory_metadata(dest_path)
elif dest_path.is_file() and dest_path.suffix in VALID_GCODE_EXTS:
mevt = self.parse_gcode_metadata(str(dest_path))
return mevt.wait()
return None
def on_item_create(
self, root: str, item_path: StrOrPath, is_dir: bool = False
) -> None:
pass
def on_item_delete(
self, root: str, item_path: StrOrPath, is_dir: bool = False
) -> None:
if self.has_fast_observe:
return
self.clear_metadata(root, str(item_path), is_dir)
def close(self) -> None:
pass
@ -1563,6 +1726,34 @@ class InotifyObserver(BaseFileSystemObserver):
self.pending_coroutines: List[Coroutine] = []
self._gc_notify_task: Optional[asyncio.Task] = None
@property
def has_fast_observe(self) -> bool:
return True
# Override and pass the callbacks from the request handlers. Inotify
# detects events quickly and takes any required actions
def on_item_create(
self, root: str, item_path: StrOrPath, is_dir: bool = False
) -> None:
pass
def on_item_delete(
self, root: str, item_path: StrOrPath, is_dir: bool = False
) -> None:
pass
def on_item_move(
self,
src_root: str,
dest_root: str,
src_path: StrOrPath,
dest_path: StrOrPath
) -> Optional[Awaitable]:
return None
def on_item_copy(self, root: str, item_path: StrOrPath) -> Optional[Awaitable]:
return None
def add_root_watch(self, root: str, root_path: str) -> None:
# remove all exisiting watches on root
if root in self.watched_roots:
@ -1647,46 +1838,6 @@ class InotifyObserver(BaseFileSystemObserver):
except Exception:
logging.exception(f"Error removing watch: '{node.get_path()}'")
def clear_metadata(self,
root: str,
path: str,
is_dir: bool = False
) -> None:
if root == "gcodes":
rel_path = self.file_manager.get_relative_path(root, path)
if is_dir:
self.gcode_metadata.remove_directory_metadata(rel_path)
else:
self.gcode_metadata.remove_file_metadata(rel_path)
def try_move_metadata(
self,
prev_root: str,
new_root: str,
prev_path: str,
new_path: str,
is_dir: bool = False
) -> Union[bool, Awaitable]:
if new_root == "gcodes":
if prev_root == "gcodes":
# moved within the gcodes root, move metadata
fm = self.file_manager
gcm = self.gcode_metadata
prev_rel_path = fm.get_relative_path("gcodes", prev_path)
new_rel_path = fm.get_relative_path("gcodes", new_path)
if is_dir:
gcm.move_directory_metadata(prev_rel_path, new_rel_path)
else:
return gcm.move_file_metadata(prev_rel_path, new_rel_path)
else:
# move from a non-gcodes root to gcodes root needs a rescan
self.clear_metadata(prev_root, prev_path, is_dir)
return False
elif prev_root == "gcodes":
# moved out of the gcodes root, remove metadata
self.clear_metadata(prev_root, prev_path, is_dir)
return True
def log_nodes(self) -> None:
if self.server.is_verbose_enabled():
debug_msg = f"Inotify Watches After Scan:"
@ -1697,25 +1848,6 @@ class InotifyObserver(BaseFileSystemObserver):
f"Watch: {wdesc}"
logging.debug(debug_msg)
def parse_gcode_metadata(self, file_path: str) -> asyncio.Event:
rel_path = self.file_manager.get_relative_path("gcodes", file_path)
ext = os.path.splitext(rel_path)[-1].lower()
try:
path_info = self.file_manager.get_path_info(file_path, "gcodes")
except Exception:
path_info = {}
if (
ext not in VALID_GCODE_EXTS or
path_info.get('size', 0) == 0
):
evt = asyncio.Event()
evt.set()
return evt
if ext == ".ufp":
rel_path = os.path.splitext(rel_path)[0] + ".gcode"
path_info['ufp_path'] = file_path
return self.gcode_metadata.parse_metadata(rel_path, path_info)
def _handle_move_timeout(self, cookie: int, is_dir: bool):
if cookie not in self.pending_moves:
return