diff --git a/moonraker/app.py b/moonraker/app.py index c64532c..caa4fce 100644 --- a/moonraker/app.py +++ b/moonraker/app.py @@ -693,15 +693,12 @@ class FileRequestHandler(AuthorizedFileHandler): async def delete(self, path: str) -> None: path = self.request.path.lstrip("/").split("/", 2)[-1] path = url_unescape(path, plus=False) + file_manager: FileManager file_manager = self.server.lookup_component('file_manager') try: filename = await file_manager.delete_file(path) except self.server.error as e: - if e.status_code == 403: - raise tornado.web.HTTPError( - 403, "File is loaded, DELETE not permitted") - else: - raise tornado.web.HTTPError(e.status_code, str(e)) + raise tornado.web.HTTPError(e.status_code, str(e)) self.finish({'result': filename}) async def get(self, path: str, include_body: bool = True) -> None: @@ -713,6 +710,12 @@ class FileRequestHandler(AuthorizedFileHandler): self.root, absolute_path) if self.absolute_path is None: return + file_manager: FileManager + file_manager = self.server.lookup_component('file_manager') + try: + file_manager.check_reserved_path(self.absolute_path, False) + except self.server.error as e: + raise tornado.web.HTTPError(e.status_code, str(e)) self.modified = self.get_modified_time() self.set_headers() diff --git a/moonraker/components/file_manager/file_manager.py b/moonraker/components/file_manager/file_manager.py index 6f3d230..4146444 100644 --- a/moonraker/components/file_manager/file_manager.py +++ b/moonraker/components/file_manager/file_manager.py @@ -59,15 +59,16 @@ class FileManager: def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() self.event_loop = self.server.get_event_loop() - self.reserved_paths: Dict[str, pathlib.Path] = {} + self.reserved_paths: Dict[str, Tuple[pathlib.Path, bool]] = {} self.full_access_roots: Set[str] = set() self.file_paths: Dict[str, str] = {} - self.add_reserved_path("moonraker", MOONRAKER_PATH) - db: DBComp = self.server.load_component(config, "database") - db_path = db.get_database_path() - self.add_reserved_path("database", db_path) app_args = self.server.get_app_args() self.datapath = pathlib.Path(app_args["data_path"]) + self.add_reserved_path("moonraker", MOONRAKER_PATH, False) + db: DBComp = self.server.load_component(config, "database") + db_path = db.get_database_path() + self.add_reserved_path("database", db_path, False) + self.add_reserved_path("certs", self.datapath.joinpath("certs"), False) self.gcode_metadata = MetadataStorage(config, db) self.inotify_handler = INotifyHandler(config, self, self.gcode_metadata) @@ -144,6 +145,7 @@ class FileManager: # Register path for example configs klipper_path = paths.get('klipper_path', None) if klipper_path is not None: + self.reserved_paths.pop("klipper", None) self.add_reserved_path("klipper", klipper_path) example_cfg_path = os.path.join(klipper_path, "config") self.register_directory("config_examples", example_cfg_path) @@ -205,8 +207,6 @@ class FileManager: return False permissions = os.R_OK if full_access: - if not self._check_root_safe(root, path): - return False permissions |= os.W_OK self.full_access_roots.add(root) if not os.access(path, permissions): @@ -229,70 +229,38 @@ class FileManager: "root_update", root, path) return True - def _paths_overlap(self, - path_one: StrOrPath, - path_two: StrOrPath - ) -> bool: - if isinstance(path_one, str): - path_one = pathlib.Path(path_one) - path_one = path_one.expanduser().resolve() - if isinstance(path_two, str): - path_two = pathlib.Path(path_two) - path_two = path_two.expanduser().resolve() - return ( - path_one == path_two or - path_one in path_two.parents or - path_two in path_one.parents - ) - - def _check_root_safe(self, new_root: str, new_path: StrOrPath) -> bool: - # Make sure that registered full access paths - # do no overlap one another, nor a reserved path - if isinstance(new_path, str): - new_path = pathlib.Path(new_path) - new_path = new_path.expanduser().resolve() - for reg_root, reg_path in self.file_paths.items(): - exp_reg_path = pathlib.Path(reg_path).expanduser().resolve() + def check_reserved_path( + self, + req_path: StrOrPath, + need_write: bool, + raise_error: bool = True + ) -> bool: + if isinstance(req_path, str): + req_path = pathlib.Path(req_path) + req_path = req_path.expanduser().resolve() + for name, (res_path, can_read) in self.reserved_paths.items(): if ( - reg_root not in self.full_access_roots or - (reg_root == new_root and new_path == exp_reg_path) + (res_path == req_path or res_path in req_path.parents) and + (need_write or not can_read) ): - continue - if self._paths_overlap(new_path, exp_reg_path): - self.server.add_warning( - f"Failed to register '{new_root}': '{new_path}', path " - f"overlaps registered root '{reg_root}': '{exp_reg_path}'") - return False - for res_name, res_path in self.reserved_paths.items(): - if self._paths_overlap(new_path, res_path): - self.server.add_warning( - f"Failed to register '{new_root}': '{new_path}', path " - f"overlaps reserved path '{res_name}': '{res_path}'") - return False - return True + if not raise_error: + return True + raise self.server.error( + f"Access to file {req_path.name} forbidden by reserved " + f"path '{name}'", 403 + ) + return False - def add_reserved_path(self, name: str, res_path: StrOrPath) -> bool: + def add_reserved_path( + self, name: str, res_path: StrOrPath, read_access: bool = True + ) -> bool: + if name in self.reserved_paths: + return False if isinstance(res_path, str): res_path = pathlib.Path(res_path) res_path = res_path.expanduser().resolve() - if ( - name in self.reserved_paths and - res_path == self.reserved_paths[name] - ): - return True - self.reserved_paths[name] = res_path - check_passed = True - for reg_root, reg_path in list(self.file_paths.items()): - if reg_root not in self.full_access_roots: - continue - exp_reg_path = pathlib.Path(reg_path).expanduser().resolve() - if self._paths_overlap(res_path, exp_reg_path): - self.server.add_warning( - f"Full access root '{reg_root}' overlaps reserved path " - f"'{name}', removing access") - self.file_paths.pop(reg_root, None) - check_passed = False - return check_passed + self.reserved_paths[name] = (res_path, read_access) + return True def get_directory(self, root: str = "gcodes") -> str: return self.file_paths.get(root, "") @@ -367,6 +335,7 @@ class FileManager: dir_info = self._list_directory(dir_path, root, is_extended) return dir_info async with self.write_mutex: + self.check_reserved_path(dir_path, True) result = { 'item': {'path': directory, 'root': root}, 'action': "create_dir"} @@ -460,6 +429,8 @@ class FileManager: if dest_root not in self.full_access_roots: raise self.server.error( f"Destination path is read-only: {dest_root}") + self.check_reserved_path(source_path, False) + self.check_reserved_path(dest_path, True) async with self.write_mutex: result: Dict[str, Any] = {'item': {'root': dest_root}} if not os.path.exists(source_path): @@ -540,16 +511,19 @@ class FileManager: } return flist - def get_path_info(self, path: str, root: str) -> Dict[str, Any]: - fstat = os.stat(path) - real_path = os.path.realpath(path) + def get_path_info(self, path: StrOrPath, root: str) -> Dict[str, Any]: + if isinstance(path, str): + path = pathlib.Path(path) + real_path = path.resolve() + fstat = path.stat() permissions = "rw" if ( - (os.path.islink(path) and os.path.isfile(real_path)) or - not os.access(real_path, os.R_OK | os.W_OK) or - root not in self.full_access_roots + root not in self.full_access_roots or + (path.is_symlink() and path.is_file()) ): permissions = "r" + if self.check_reserved_path(real_path, permissions == "rw", False): + permissions = "" return { 'modified': fstat.st_mtime, 'size': fstat.st_size, @@ -569,6 +543,7 @@ class FileManager: async with self.write_mutex: try: upload_info = self._parse_upload_args(form_args) + self.check_reserved_path(upload_info["dest_path"], True) root = upload_info['root'] if root == "gcodes" and upload_info['ext'] in VALID_GCODE_EXTS: result = await self._finish_gcode_upload(upload_info) @@ -820,6 +795,7 @@ class FileManager: async def delete_file(self, path: str) -> Dict[str, Any]: async with self.write_mutex: root, full_path = self._convert_request_path(path) + self.check_reserved_path(full_path, True) filename = self.get_relative_path(root, full_path) if root not in self.full_access_roots: raise self.server.error( diff --git a/moonraker/components/secrets.py b/moonraker/components/secrets.py index f06afdc..bf79af1 100644 --- a/moonraker/components/secrets.py +++ b/moonraker/components/secrets.py @@ -16,6 +16,7 @@ from typing import ( ) if TYPE_CHECKING: from confighelper import ConfigHelper + from .file_manager.file_manager import FileManager class Secrets: def __init__(self, config: ConfigHelper) -> None: @@ -30,6 +31,8 @@ class Secrets: fpath = pathlib.Path(path).expanduser().resolve() self.type = "invalid" self.values: Dict[str, Any] = {} + fm: FileManager = server.lookup_component("file_manager") + fm.add_reserved_path("secrets", fpath, False) if fpath.is_file(): self.secrets_file = fpath data = self.secrets_file.read_text() diff --git a/moonraker/components/update_manager/app_deploy.py b/moonraker/components/update_manager/app_deploy.py index 5a7b4a6..4231b89 100644 --- a/moonraker/components/update_manager/app_deploy.py +++ b/moonraker/components/update_manager/app_deploy.py @@ -25,6 +25,7 @@ if TYPE_CHECKING: from confighelper import ConfigHelper from .update_manager import CommandHelper from ..machine import Machine + from ..file_manager.file_manager import FileManager SUPPORTED_CHANNELS = { "zip": ["stable", "beta"], @@ -60,6 +61,12 @@ class AppDeploy(BaseDeploy): self.type = "zip" self.path = pathlib.Path( config.get('path')).expanduser().resolve() + if ( + self.name not in ["moonraker", "klipper"] + and not self.path.joinpath(".writeable").is_file() + ): + fm: FileManager = self.server.lookup_component("file_manager") + fm.add_reserved_path(f"update_manager {self.name}", self.path) executable = config.get('env', None) if self.channel not in SUPPORTED_CHANNELS[self.type]: raise config.error( diff --git a/moonraker/components/update_manager/base_config.py b/moonraker/components/update_manager/base_config.py index 9d05195..4bcccbc 100644 --- a/moonraker/components/update_manager/base_config.py +++ b/moonraker/components/update_manager/base_config.py @@ -8,6 +8,7 @@ from __future__ import annotations import os import sys import copy +from utils import MOONRAKER_PATH from typing import ( TYPE_CHECKING, Dict @@ -17,8 +18,6 @@ if TYPE_CHECKING: from confighelper import ConfigHelper from components.database import MoonrakerDatabase -MOONRAKER_PATH = os.path.normpath(os.path.join( - os.path.dirname(__file__), "../../..")) KLIPPER_DEFAULT_PATH = os.path.expanduser("~/klipper") KLIPPER_DEFAULT_EXEC = os.path.expanduser("~/klippy-env/bin/python") diff --git a/moonraker/components/update_manager/base_deploy.py b/moonraker/components/update_manager/base_deploy.py index 1611125..ca8553d 100644 --- a/moonraker/components/update_manager/base_deploy.py +++ b/moonraker/components/update_manager/base_deploy.py @@ -23,7 +23,7 @@ class BaseDeploy: cfg_hash: Optional[str] = None ) -> None: if name is None: - name = config.get_name().split()[-1] + name = config.get_name().split(maxsplit=1)[-1] self.name = name if prefix: prefix = f"{prefix} {self.name}: " diff --git a/moonraker/components/update_manager/update_manager.py b/moonraker/components/update_manager/update_manager.py index 0c8179d..32c0817 100644 --- a/moonraker/components/update_manager/update_manager.py +++ b/moonraker/components/update_manager/update_manager.py @@ -46,6 +46,7 @@ if TYPE_CHECKING: from components.dbus_manager import DbusManager from components.machine import Machine from components.http_client import HttpClient + from components.file_manager.file_manager import FileManager from eventloop import FlexTimer from dbus_next import Variant from dbus_next.aio import ProxyInterface @@ -1133,6 +1134,8 @@ class WebClientDeploy(BaseDeploy): self.repo = config.get('repo').strip().strip("/") self.owner = self.repo.split("/", 1)[0] self.path = pathlib.Path(config.get("path")).expanduser().resolve() + fm: FileManager = self.server.lookup_component("file_manager") + fm.add_reserved_path(f"update_manager {self.name}", self.path) self.type = config.get('type') def_channel = "stable" if self.type == "web_beta":