file_manager: add request notifcation fallback

Schedule fallback websocket notifications in the event that
inotify is unable to watch a file system.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-02-17 20:21:43 -05:00
parent 51e307dbd6
commit 7330c2c123
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
1 changed files with 86 additions and 57 deletions

View File

@ -82,9 +82,9 @@ class FileManager:
self.inotify_handler = INotifyHandler( self.inotify_handler = INotifyHandler(
config, self, self.gcode_metadata, self.sync_lock config, self, self.gcode_metadata, self.sync_lock
) )
self.scheduled_notifications: Dict[str, asyncio.TimerHandle] = {}
self.fixed_path_args: Dict[str, Any] = {} self.fixed_path_args: Dict[str, Any] = {}
self.queue_gcodes: bool = config.getboolean('queue_gcode_uploads', self.queue_gcodes: bool = config.getboolean('queue_gcode_uploads', False)
False)
# Register file management endpoints # Register file management endpoints
self.server.register_endpoint( self.server.register_endpoint(
@ -402,29 +402,25 @@ class FileManager:
) -> Dict[str, Any]: ) -> Dict[str, Any]:
directory = web_request.get_str('path', "gcodes") directory = web_request.get_str('path', "gcodes")
root, dir_path = self._convert_request_path(directory) root, dir_path = self._convert_request_path(directory)
action = web_request.get_action() method = web_request.get_action()
if action == 'GET': if method == 'GET':
is_extended = web_request.get_boolean('extended', False) is_extended = web_request.get_boolean('extended', False)
# Get list of files and subdirectories for this target # Get list of files and subdirectories for this target
dir_info = self._list_directory(dir_path, root, is_extended) dir_info = self._list_directory(dir_path, root, is_extended)
return dir_info return dir_info
async with self.sync_lock: async with self.sync_lock:
self.check_reserved_path(dir_path, True) self.check_reserved_path(dir_path, True)
rel_dir = self.get_relative_path(root, dir_path) action = "create_dir"
result = { if method == 'POST' and root in self.full_access_roots:
'item': {'path': rel_dir, 'root': root},
'action': "create_dir"
}
if action == 'POST' and root in self.full_access_roots:
# Create a new directory # Create a new directory
self.sync_lock.setup("create_dir", dir_path) self.sync_lock.setup("create_dir", dir_path)
try: try:
os.mkdir(dir_path) os.mkdir(dir_path)
except Exception as e: except Exception as e:
raise self.server.error(str(e)) raise self.server.error(str(e))
elif action == 'DELETE' and root in self.full_access_roots: elif method == 'DELETE' and root in self.full_access_roots:
# Remove a directory # Remove a directory
result['action'] = "delete_dir" action = "delete_dir"
if directory.strip("/") == root: if directory.strip("/") == root:
raise self.server.error( raise self.server.error(
"Cannot delete root directory") "Cannot delete root directory")
@ -449,7 +445,7 @@ class FileManager:
raise self.server.error(str(e)) raise self.server.error(str(e))
else: else:
raise self.server.error("Operation Not Supported", 405) raise self.server.error("Operation Not Supported", 405)
return result return self._sched_changed_event(action, root, dir_path)
def _handle_operation_check(self, requested_path: str) -> bool: def _handle_operation_check(self, requested_path: str) -> bool:
if not self.get_relative_path("gcodes", requested_path): if not self.get_relative_path("gcodes", requested_path):
@ -504,12 +500,12 @@ class FileManager:
self.check_reserved_path(source_path, False) self.check_reserved_path(source_path, False)
self.check_reserved_path(dest_path, True) self.check_reserved_path(dest_path, True)
async with self.sync_lock: async with self.sync_lock:
result: Dict[str, Any] = {'item': {'root': dest_root}}
if not os.path.exists(source_path): if not os.path.exists(source_path):
raise self.server.error(f"File {source_path} does not exist") raise self.server.error(f"File {source_path} does not exist")
# make sure the destination is not in use # make sure the destination is not in use
if os.path.exists(dest_path): if os.path.exists(dest_path):
self._handle_operation_check(dest_path) self._handle_operation_check(dest_path)
src_info: Tuple[Optional[str], ...] = (None, None)
if ep == "/server/files/move": if ep == "/server/files/move":
if source_root not in self.full_access_roots: if source_root not in self.full_access_roots:
raise self.server.error( raise self.server.error(
@ -517,26 +513,22 @@ class FileManager:
# if moving the file, make sure the source is not in use # if moving the file, make sure the source is not in use
self._handle_operation_check(source_path) self._handle_operation_check(source_path)
op_func: Callable[..., str] = shutil.move op_func: Callable[..., str] = shutil.move
result['source_item'] = { action = "move_dir" if os.path.isdir(source_path) else "move_file"
'path': self.get_relative_path(source_root, source_path), src_info = (source_root, source_path)
'root': source_root
}
result['action'] = "move_dir" if os.path.isdir(source_path) \
else "move_file"
elif ep == "/server/files/copy": elif ep == "/server/files/copy":
if os.path.isdir(source_path): if os.path.isdir(source_path):
result['action'] = "create_dir" action = "create_dir"
op_func = shutil.copytree op_func = shutil.copytree
else: else:
result['action'] = "create_file" action = "create_file"
source_base = os.path.basename(source_path) source_base = os.path.basename(source_path)
if ( if (
os.path.isfile(dest_path) or os.path.isfile(dest_path) or
os.path.isfile(os.path.join(dest_path, source_base)) os.path.isfile(os.path.join(dest_path, source_base))
): ):
result['action'] = "modify_file" action = "modify_file"
op_func = shutil.copy2 op_func = shutil.copy2
self.sync_lock.setup(result["action"], dest_path, move_copy=True) self.sync_lock.setup(action, dest_path, move_copy=True)
try: try:
full_dest = await self.event_loop.run_in_thread( full_dest = await self.event_loop.run_in_thread(
op_func, source_path, dest_path) op_func, source_path, dest_path)
@ -544,8 +536,9 @@ class FileManager:
await self.sync_lock.wait_inotify_event(full_dest) await self.sync_lock.wait_inotify_event(full_dest)
except Exception as e: except Exception as e:
raise self.server.error(str(e)) from e raise self.server.error(str(e)) from e
result['item']['path'] = self.get_relative_path(dest_root, full_dest) return self._sched_changed_event(
return result action, dest_root, full_dest, src_info[0], src_info[1]
)
async def _handle_zip_files( async def _handle_zip_files(
self, web_request: WebRequest self, web_request: WebRequest
@ -586,10 +579,9 @@ class FileManager:
await self.event_loop.run_in_thread( await self.event_loop.run_in_thread(
self._zip_files, items, dest_path, store_only self._zip_files, items, dest_path, store_only
) )
ret = self._sched_changed_event("create_file", dest_root, str(dest_path))
rel_dest = dest_path.relative_to(self.file_paths[dest_root])
return { return {
"destination": {"root": dest_root, "path": str(rel_dest)}, "destination": ret["item"],
"action": "zip_files" "action": "zip_files"
} }
@ -685,11 +677,18 @@ class FileManager:
} }
return flist return flist
def get_path_info(self, path: StrOrPath, root: str) -> Dict[str, Any]: def get_path_info(
self, path: StrOrPath, root: str, raise_error: bool = True
) -> Dict[str, Any]:
if isinstance(path, str): if isinstance(path, str):
path = pathlib.Path(path) path = pathlib.Path(path)
real_path = path.resolve() real_path = path.resolve()
try:
fstat = path.stat() fstat = path.stat()
except Exception:
if raise_error:
raise
return {"modified": 0, "size": 0, "permissions": ""}
if ".git" in real_path.parts: if ".git" in real_path.parts:
permissions = "" permissions = ""
else: else:
@ -819,27 +818,19 @@ class FileManager:
await job_queue.queue_job( await job_queue.queue_job(
upload_info['filename'], check_exists=False) upload_info['filename'], check_exists=False)
queued = True queued = True
return { result = dict(self._sched_changed_event(
'item': { "create_file", "gcodes", upload_info["dest_path"]
'path': upload_info['filename'], ))
'root': "gcodes" result.update({"print_started": started, "print_queued": queued})
}, return result
'print_started': started,
'print_queued': queued,
'action': "create_file"
}
async def _finish_standard_upload( async def _finish_standard_upload(
self, upload_info: Dict[str, Any] self, upload_info: Dict[str, Any]
) -> Dict[str, Any]: ) -> Dict[str, Any]:
await self._process_uploaded_file(upload_info) await self._process_uploaded_file(upload_info)
return { dest_path: str = upload_info["dest_path"]
'item': { root: str = upload_info["root"]
'path': upload_info['filename'], return self._sched_changed_event("create_file", root, dest_path)
'root': upload_info['root']
},
'action': "create_file"
}
async def _process_uploaded_file(self, async def _process_uploaded_file(self,
upload_info: Dict[str, Any] upload_info: Dict[str, Any]
@ -972,7 +963,6 @@ class FileManager:
async with self.sync_lock: async with self.sync_lock:
root, full_path = self._convert_request_path(path) root, full_path = self._convert_request_path(path)
self.check_reserved_path(full_path, True) self.check_reserved_path(full_path, True)
filename = self.get_relative_path(root, full_path)
if root not in self.full_access_roots: if root not in self.full_access_roots:
raise self.server.error( raise self.server.error(
f"Path not available for DELETE: {path}", 405) f"Path not available for DELETE: {path}", 405)
@ -985,11 +975,44 @@ class FileManager:
raise raise
self.sync_lock.setup("delete_file", full_path) self.sync_lock.setup("delete_file", full_path)
os.remove(full_path) os.remove(full_path)
return { return self._sched_changed_event("delete_file", root, full_path)
'item': {'path': filename, 'root': root},
'action': "delete_file"} def _sched_changed_event(
self,
action: str,
root: str,
full_path: str,
source_root: Optional[str] = None,
source_path: Optional[str] = None
) -> Dict[str, Any]:
rel_path = self.get_relative_path(root, full_path)
path_info = self.get_path_info(full_path, root, raise_error=False)
path_info.update({"path": rel_path, "root": root})
notify_info: Dict[str, Any] = {
"action": action,
"item": path_info
}
if source_path is not None and source_root is not None:
src_rel_path = self.get_relative_path(source_root, source_path)
notify_info['source_item'] = {'path': src_rel_path, 'root': source_root}
key = f"{action}-{root}-{rel_path}"
handle = self.event_loop.delay_callback(1., self._do_notify, key, notify_info)
self.scheduled_notifications[key] = handle
return notify_info
def _do_notify(self, key: str, notify_info: Dict[str, Any]) -> None:
self.scheduled_notifications.pop(key, None)
self.server.send_event("file_manager:filelist_changed", notify_info)
def cancel_notification(self, key: str) -> None:
handle = self.scheduled_notifications.pop(key, None)
if handle is not None:
handle.cancel()
def close(self) -> None: def close(self) -> None:
for hdl in self.scheduled_notifications.values():
hdl.cancel()
self.scheduled_notifications.clear()
self.inotify_handler.close() self.inotify_handler.close()
@ -1854,7 +1877,7 @@ class INotifyHandler:
) -> None: ) -> None:
rel_path = self.file_manager.get_relative_path(root, full_path) rel_path = self.file_manager.get_relative_path(root, full_path)
sync_fut = self.sync_lock.check_in_request(action, full_path) sync_fut = self.sync_lock.check_in_request(action, full_path)
file_info: Dict[str, Any] = {'size': 0, 'modified': 0} file_info: Dict[str, Any] = {'size': 0, 'modified': 0, "permissions": ""}
if os.path.exists(full_path): if os.path.exists(full_path):
try: try:
file_info = self.file_manager.get_path_info(full_path, root) file_info = self.file_manager.get_path_info(full_path, root)
@ -1891,19 +1914,25 @@ class INotifyHandler:
src_rel_path = self.file_manager.get_relative_path( src_rel_path = self.file_manager.get_relative_path(
source_root, source_path) source_root, source_path)
result['source_item'] = {'path': src_rel_path, 'root': source_root} result['source_item'] = {'path': src_rel_path, 'root': source_root}
key = f"{action}-{root}-{rel_path}"
if sync_fut is not None: if sync_fut is not None:
# Delay this notification so that it occurs after an item # Delay this notification so that it occurs after an item
logging.debug(f"Syncing notification: {full_path}") logging.debug(f"Syncing notification: {full_path}")
self.event_loop.register_callback( self.event_loop.register_callback(
self._sync_with_request, result, sync_fut self._sync_with_request, result, sync_fut, key
) )
else: else:
self.file_manager.cancel_notification(key)
self.server.send_event("file_manager:filelist_changed", result) self.server.send_event("file_manager:filelist_changed", result)
async def _sync_with_request( async def _sync_with_request(
self, result: Dict[str, Any], sync_fut: asyncio.Future self,
result: Dict[str, Any],
sync_fut: asyncio.Future,
notify_key: str
) -> None: ) -> None:
await sync_fut await sync_fut
self.file_manager.cancel_notification(notify_key)
await asyncio.sleep(.005) await asyncio.sleep(.005)
self.server.send_event("file_manager:filelist_changed", result) self.server.send_event("file_manager:filelist_changed", result)