diff --git a/moonraker/components/file_manager/file_manager.py b/moonraker/components/file_manager/file_manager.py index d197c09..d809e53 100644 --- a/moonraker/components/file_manager/file_manager.py +++ b/moonraker/components/file_manager/file_manager.py @@ -44,7 +44,6 @@ if TYPE_CHECKING: _T = TypeVar("_T") VALID_GCODE_EXTS = ['.gcode', '.g', '.gco', '.ufp', '.nc'] -FULL_ACCESS_ROOTS = ["gcodes", "config"] METADATA_SCRIPT = os.path.abspath(os.path.join( os.path.dirname(__file__), "metadata.py")) WATCH_FLAGS = iFlags.CREATE | iFlags.DELETE | iFlags.MODIFY \ @@ -55,6 +54,7 @@ class FileManager: def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() self.event_loop = self.server.get_event_loop() + self.full_access_roots: Set[str] = set() self.file_paths: Dict[str, str] = {} db: DBComp = self.server.load_component(config, "database") gc_path: str = db.get_item( @@ -93,7 +93,8 @@ class FileManager: # Register Klippy Configuration Path config_path = config.get('config_path', None) if config_path is not None: - ret = self.register_directory('config', config_path) + ret = self.register_directory('config', config_path, + full_access=True) if not ret: raise config.error( "Option 'config_path' is not a valid directory") @@ -108,7 +109,7 @@ class FileManager: # If gcode path is in the database, register it if gc_path: - self.register_directory('gcodes', gc_path) + self.register_directory('gcodes', gc_path, full_access=True) def _update_fixed_paths(self) -> None: kinfo = self.server.get_klippy_info() @@ -138,7 +139,11 @@ class FileManager: self.server.register_static_file_handler( "klippy.log", log_path, force=True) - def register_directory(self, root: str, path: Optional[str]) -> bool: + def register_directory(self, + root: str, + path: Optional[str], + full_access: bool = False + ) -> bool: if path is None: return False path = os.path.abspath(os.path.expanduser(path)) @@ -150,8 +155,9 @@ class FileManager: "that the path exists and is not the file system root.") return False permissions = os.R_OK - if root in FULL_ACCESS_ROOTS: + if full_access: permissions |= os.W_OK + self.full_access_roots.add(root) if not os.access(path, permissions): logging.info( f"\nMoonraker does not have permission to access path " @@ -166,7 +172,7 @@ class FileManager: moon_db["file_manager.gcode_path"] = path # scan for metadata changes self.gcode_metadata.update_gcode_path(path) - if root in FULL_ACCESS_ROOTS: + if full_access: # Refresh the file list and add watches self.inotify_handler.add_root_watch(root, path) else: @@ -235,13 +241,13 @@ class FileManager: result = { 'item': {'path': directory, 'root': root}, 'action': "create_dir"} - if action == 'POST' and root in FULL_ACCESS_ROOTS: + if action == 'POST' and root in self.full_access_roots: # Create a new directory try: os.mkdir(dir_path) except Exception as e: raise self.server.error(str(e)) - elif action == 'DELETE' and root in FULL_ACCESS_ROOTS: + elif action == 'DELETE' and root in self.full_access_roots: # Remove a directory result['action'] = "delete_dir" if directory.strip("/") == root: @@ -322,7 +328,7 @@ class FileManager: ep = web_request.get_endpoint() source_root, source_path = self._convert_request_path(source) dest_root, dest_path = self._convert_request_path(destination) - if dest_root not in FULL_ACCESS_ROOTS: + if dest_root not in self.full_access_roots: raise self.server.error( f"Destination path is read-only: {dest_root}") async with self.write_mutex: @@ -333,7 +339,7 @@ class FileManager: if os.path.exists(dest_path): await self._handle_operation_check(dest_path) if ep == "/server/files/move": - if source_root not in FULL_ACCESS_ROOTS: + if source_root not in self.full_access_roots: raise self.server.error( f"Source path is read-only, cannot move: {source_root}") # if moving the file, make sure the source is not in use @@ -406,7 +412,7 @@ class FileManager: if ( (os.path.islink(path) and os.path.isfile(real_path)) or not os.access(real_path, os.R_OK | os.W_OK) or - root not in FULL_ACCESS_ROOTS + root not in self.full_access_roots ): permissions = "r" return { @@ -431,7 +437,7 @@ class FileManager: root = upload_info['root'] if root == "gcodes" and upload_info['ext'] in VALID_GCODE_EXTS: result = await self._finish_gcode_upload(upload_info) - elif root in FULL_ACCESS_ROOTS: + elif root in self.full_access_roots: result = await self._finish_standard_upload(upload_info) else: raise self.server.error(f"Invalid root request: {root}") @@ -677,7 +683,10 @@ class FileManager: f"Path not available for DELETE: {path}", 405) root = parts[0] filename = parts[1] - if root not in self.file_paths or root not in FULL_ACCESS_ROOTS: + if ( + root not in self.file_paths or + root not in self.full_access_roots + ): raise self.server.error( f"Path not available for DELETE: {path}", 405) root_path = self.file_paths[root] @@ -1027,8 +1036,6 @@ class INotifyHandler: def add_root_watch(self, root: str, root_path: str) -> None: - if root not in FULL_ACCESS_ROOTS: - return # remove all exisiting watches on root if root in self.watched_roots: old_root = self.watched_roots.pop(root) diff --git a/moonraker/moonraker.py b/moonraker/moonraker.py index a4b8744..d9b90fc 100755 --- a/moonraker/moonraker.py +++ b/moonraker/moonraker.py @@ -441,7 +441,8 @@ class Server: if vsd_path is not None: file_manager: FileManager = self.lookup_component( 'file_manager') - file_manager.register_directory('gcodes', vsd_path) + file_manager.register_directory('gcodes', vsd_path, + full_access=True) else: logging.info( "Configuration for [virtual_sdcard] not found,"