diff --git a/moonraker/components/file_manager.py b/moonraker/components/file_manager.py index e557502..1380588 100644 --- a/moonraker/components/file_manager.py +++ b/moonraker/components/file_manager.py @@ -12,7 +12,7 @@ import tempfile import asyncio from concurrent.futures import ThreadPoolExecutor from tornado.ioloop import IOLoop -from tornado.locks import Event +from tornado.locks import Event, Lock, Condition from inotify_simple import INotify from inotify_simple import flags as iFlags @@ -33,6 +33,8 @@ class FileManager: self.gcode_metadata = MetadataStorage(self.server, gc_path, database) self.inotify_handler = INotifyHandler(config, self, self.gcode_metadata) + self.write_mutex = Lock() + self.notify_sync_lock = None self.fixed_path_args = {} # Register file management endpoints @@ -151,6 +153,12 @@ class FileManager: file_path = os.path.join(root_dir, filename) return os.path.exists(file_path) + def sync_inotify_event(self, path): + if self.notify_sync_lock is None or \ + not self.notify_sync_lock.check_need_sync(path): + return None + return self.notify_sync_lock + async def _handle_filelist_request(self, web_request): root = web_request.get_str('root', "gcodes") return self.get_file_list(root, list_format=True) @@ -173,33 +181,34 @@ class FileManager: # Get list of files and subdirectories for this target dir_info = self._list_directory(dir_path, is_extended) return dir_info - elif action == 'POST' and root in FULL_ACCESS_ROOTS: - # Create a new directory - try: - os.mkdir(dir_path) - except Exception as e: - raise self.server.error(str(e)) - elif action == 'DELETE' and root in FULL_ACCESS_ROOTS: - # Remove a directory - if directory.strip("/") == root: - raise self.server.error( - "Cannot delete root directory") - if not os.path.isdir(dir_path): - raise self.server.error( - f"Directory does not exist ({directory})") - force = web_request.get_boolean('force', False) - if force: - # Make sure that the directory does not contain a file - # loaded by the virtual_sdcard - await self._handle_operation_check(dir_path) - shutil.rmtree(dir_path) - else: + async with self.write_mutex: + if action == 'POST' and root in FULL_ACCESS_ROOTS: + # Create a new directory try: - os.rmdir(dir_path) + os.mkdir(dir_path) except Exception as e: raise self.server.error(str(e)) - else: - raise self.server.error("Operation Not Supported", 405) + elif action == 'DELETE' and root in FULL_ACCESS_ROOTS: + # Remove a directory + if directory.strip("/") == root: + raise self.server.error( + "Cannot delete root directory") + if not os.path.isdir(dir_path): + raise self.server.error( + f"Directory does not exist ({directory})") + force = web_request.get_boolean('force', False) + if force: + # Make sure that the directory does not contain a file + # loaded by the virtual_sdcard + await self._handle_operation_check(dir_path) + shutil.rmtree(dir_path) + else: + try: + os.rmdir(dir_path) + except Exception as e: + raise self.server.error(str(e)) + else: + raise self.server.error("Operation Not Supported", 405) return "ok" async def _handle_operation_check(self, requested_path): @@ -238,47 +247,44 @@ class FileManager: source = web_request.get_str("source") destination = web_request.get_str("dest") ep = web_request.get_endpoint() - if source is None: - raise self.server.error("File move/copy request issing source") - if destination is None: - raise self.server.error( - "File move/copy request missing destination") source_root, source_path = self._convert_request_path(source) dest_root, dest_path = self._convert_request_path(destination) if dest_root not in FULL_ACCESS_ROOTS: raise self.server.error( f"Destination path is read-only: {dest_root}") - if not os.path.exists(source_path): - raise self.server.error(f"File {source_path} does not exist") - # make sure the destination is not in use - if os.path.exists(dest_path): - await self._handle_operation_check(dest_path) - if ep == "/server/files/move": - if source_root not in FULL_ACCESS_ROOTS: - raise self.server.error( - f"Source path is read-only, cannot move: {source_root}") - # if moving the file, make sure the source is not in use - await self._handle_operation_check(source_path) - try: - shutil.move(source_path, dest_path) - except Exception as e: - raise self.server.error(str(e)) - elif ep == "/server/files/copy": + async with self.write_mutex: + if not os.path.exists(source_path): + raise self.server.error(f"File {source_path} does not exist") + # make sure the destination is not in use + if os.path.exists(dest_path): + await self._handle_operation_check(dest_path) + if ep == "/server/files/move": + if source_root not in FULL_ACCESS_ROOTS: + raise self.server.error( + f"Source path is read-only, cannot move: {source_root}") + # if moving the file, make sure the source is not in use + await self._handle_operation_check(source_path) + op_func = shutil.move + elif ep == "/server/files/copy": + if os.path.isdir(source_path): + op_func = shutil.copytree + else: + op_func = shutil.copy2 ioloop = IOLoop.current() - with ThreadPoolExecutor(max_workers=1) as tpe: - await ioloop.run_in_executor( - tpe, self._do_copy, source_path, dest_path) + self.notify_sync_lock = NotifySyncLock(dest_path) + try: + with ThreadPoolExecutor(max_workers=1) as tpe: + full_dest = await ioloop.run_in_executor( + tpe, op_func, source_path, dest_path) + except Exception as e: + self.notify_sync_lock.cancel() + self.notify_sync_lock = None + raise self.server.error(str(e)) + self.notify_sync_lock.update_dest(full_dest) + await self.notify_sync_lock.wait(600.) + self.notify_sync_lock = None return "ok" - def _do_copy(self, source_path, dest_path): - try: - if os.path.isdir(source_path): - shutil.copytree(source_path, dest_path) - else: - shutil.copy2(source_path, dest_path) - except Exception as e: - raise self.server.error(str(e)) - def _list_directory(self, path, is_extended=False): if not os.path.isdir(path): raise self.server.error( @@ -321,21 +327,22 @@ class FileManager: async def finalize_upload(self, form_args): # lookup root file path - try: - upload_info = self._parse_upload_args(form_args) - root = upload_info['root'] - if root == "gcodes": - result = await self._finish_gcode_upload(upload_info) - elif root in FULL_ACCESS_ROOTS: - result = await self._finish_standard_upload(upload_info) - else: - raise self.server.error(f"Invalid root request: {root}") - except Exception: + async with self.write_mutex: try: - os.remove(form_args['tmp_file_path']) + upload_info = self._parse_upload_args(form_args) + root = upload_info['root'] + if root == "gcodes": + result = await self._finish_gcode_upload(upload_info) + elif root in FULL_ACCESS_ROOTS: + result = await self._finish_standard_upload(upload_info) + else: + raise self.server.error(f"Invalid root request: {root}") except Exception: - pass - raise + try: + os.remove(form_args['tmp_file_path']) + except Exception: + pass + raise return result def _parse_upload_args(self, upload_args): @@ -395,6 +402,7 @@ class FileManager: start_print = False # Don't start if another print is currently in progress start_print = start_print and not print_ongoing + self.notify_sync_lock = NotifySyncLock(upload_info['dest_path']) finfo = self._process_uploaded_file(upload_info) await self.gcode_metadata.parse_metadata( upload_info['filename'], finfo).wait() @@ -406,6 +414,8 @@ class FileManager: except self.server.error: # Attempt to start print failed start_print = False + await self.notify_sync_lock.wait(300.) + self.notify_sync_lock = None return { 'result': upload_info['filename'], 'print_started': start_print @@ -523,26 +533,27 @@ class FileManager: return await self.delete_file(file_path) async def delete_file(self, path): - parts = path.lstrip("/").split("/", 1) - if len(parts) != 2: - raise self.server.error( - f"Path not available for DELETE: {path}", 405) - root = parts[0] - filename = parts[1] - if root not in self.file_paths or root not in FULL_ACCESS_ROOTS: - raise self.server.error( - f"Path not available for DELETE: {path}", 405) - root_path = self.file_paths[root] - full_path = os.path.join(root_path, filename) - if not os.path.isfile(full_path): - raise self.server.error(f"Invalid file path: {path}") - if root == "gcodes": - try: - await self._handle_operation_check(full_path) - except self.server.error as e: - if e.status_code == 403: - raise - os.remove(full_path) + async with self.write_mutex: + parts = path.lstrip("/").split("/", 1) + if len(parts) != 2: + raise self.server.error( + f"Path not available for DELETE: {path}", 405) + root = parts[0] + filename = parts[1] + if root not in self.file_paths or root not in FULL_ACCESS_ROOTS: + raise self.server.error( + f"Path not available for DELETE: {path}", 405) + root_path = self.file_paths[root] + full_path = os.path.join(root_path, filename) + if not os.path.isfile(full_path): + raise self.server.error(f"Invalid file path: {path}") + if root == "gcodes": + try: + await self._handle_operation_check(full_path) + except self.server.error as e: + if e.status_code == 403: + raise + os.remove(full_path) return filename def close(self): @@ -779,6 +790,63 @@ class InotifyRootNode(InotifyNode): def get_root(self): return self.root_name +class NotifySyncLock: + def __init__(self, dest_path): + self.wait_fut = None + self.sync_condition = Condition() + self.dest_path = dest_path + self.notified_paths = set() + self.finished = False + + def update_dest(self, dest_path): + self.dest_path = dest_path + + def check_need_sync(self, path): + return self.dest_path in [path, os.path.dirname(path)] \ + and not self.finished + + async def wait(self, timeout=None): + if self.finished or self.wait_fut is not None: + # Can only wait once + return + if self.dest_path not in self.notified_paths: + self.wait_fut = asyncio.Future() + if timeout is None: + await self.wait_fut + else: + try: + await asyncio.wait_for(self.wait_fut, timeout) + except asyncio.TimeoutError: + pass + self.sync_condition.notify_all() + self.finished = True + + async def sync(self, path, timeout=None): + if not self.check_need_sync(path): + return + self.notified_paths.add(path) + if self.wait_fut is not None and self.dest_path == path: + self.wait_fut.set_result(None) + # Transfer control to waiter + if timeout is not None: + timeout = IOLoop.time() + timeout + try: + await self.sync_condition.wait(timeout) + except Exception: + pass + else: + # Sleep an additional 5ms to give HTTP requests a chance to + # return prior to a notification + await asyncio.sleep(.005) + + def cancel(self): + if self.finished: + return + if self.wait_fut is not None and not self.wait_fut.done(): + self.wait_fut.set_result(None) + self.sync_condition.notify_all() + self.finished = True + class INotifyHandler: def __init__(self, config, file_manager, gcode_metadata): self.server = config.get_server() @@ -1051,6 +1119,17 @@ class INotifyHandler: src_rel_path = self.file_manager.get_relative_path( source_root, source_path) result['source_item'] = {'path': src_rel_path, 'root': source_root} + sync_lock = self.file_manager.sync_inotify_event(full_path) + if sync_lock is not None: + # Delay this notification so that it occurs after an item + logging.debug(f"Syncing notification: {full_path}") + IOLoop.current().spawn_callback( + self._delay_notification, result, sync_lock.sync(full_path)) + else: + self.server.send_event("file_manager:filelist_changed", result) + + async def _delay_notification(self, result, sync_fut): + await sync_fut self.server.send_event("file_manager:filelist_changed", result) def close(self):