machine: disable file write access when validation fails

Prevent users from uploading files before validation is complete, as
this can populate one of the subfolders resulting in a failure when
attempting to symlink the original path.

When validating the config symlink the database first.  This should
allow Moonraker to correctly move the database should an error
be encountered when validating the other config options.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2022-10-15 05:51:01 -04:00
parent fb679aa056
commit 500e8f3b68
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
3 changed files with 29 additions and 12 deletions

View File

@ -878,6 +878,8 @@ class FileUploadHandler(AuthorizedRequestHandler):
def prepare(self) -> None:
super(FileUploadHandler, self).prepare()
fm: FileManager = self.server.lookup_component("file_manager")
fm.check_write_enabled()
if self.request.method == "POST":
assert isinstance(self.request.connection, HTTP1Connection)
self.request.connection.set_max_body_size(self.max_upload_size)

View File

@ -193,6 +193,16 @@ class FileManager:
self.register_directory(folder_name, str(new_path), full_access)
return new_path
def disable_write_access(self):
self.full_access_roots.clear()
def check_write_enabled(self):
if not self.full_access_roots:
raise self.server.error(
"Write access is currently disabled. Check notifications "
"for warnings."
)
def register_directory(self,
root: str,
path: Optional[str],
@ -548,12 +558,12 @@ class FileManager:
upload_info = self._parse_upload_args(form_args)
self.check_reserved_path(upload_info["dest_path"], True)
root = upload_info['root']
if root not in self.full_access_roots:
raise self.server.error(f"Invalid root request: {root}")
if root == "gcodes" and upload_info['ext'] in VALID_GCODE_EXTS:
result = await self._finish_gcode_upload(upload_info)
elif root in self.full_access_roots:
result = await self._finish_standard_upload(upload_info)
else:
raise self.server.error(f"Invalid root request: {root}")
result = await self._finish_standard_upload(upload_info)
except Exception:
try:
os.remove(form_args['tmp_file_path'])

View File

@ -1207,6 +1207,7 @@ class InstallValidator:
if INSTALL_VERSION <= install_ver and not self.force_validation:
logging.debug("Installation version in database up to date")
return False
fm: FileManager = self.server.lookup_component("file_manager")
need_restart: bool = False
has_error: bool = False
try:
@ -1219,11 +1220,13 @@ class InstallValidator:
except ValidationError as ve:
has_error = True
self.server.add_warning(str(ve))
fm.disable_write_access()
except Exception as e:
has_error = True
msg = f"Failed to validate {name}: {e}"
logging.exception(msg)
self.server.add_warning(msg, log=False)
fm.disable_write_access()
else:
await db.insert_item(
"moonraker", "validate_install.install_version", INSTALL_VERSION
@ -1469,8 +1472,18 @@ class InstallValidator:
await cfg_source.write_config(cfg_bkp_path)
# Create symbolic links for configured folders
server_cfg = self.config["server"]
fm_cfg = self.config["file_manager"]
db_cfg = self.config["database"]
# symlink database path first
db_path = db_cfg.get("database_path", None)
default_db = pathlib.Path("~/.moonraker_database").expanduser()
if db_path is None and default_db.exists():
self._link_data_subfolder("database", default_db)
elif db_path is not None:
self._link_data_subfolder("database", db_path)
cfg_source.remove_option("database", "database_path")
fm_cfg = self.config["file_manager"]
cfg_path = fm_cfg.get("config_path", None)
if cfg_path is None:
cfg_path = server_cfg.get("config_path", None)
@ -1494,14 +1507,6 @@ class InstallValidator:
self._link_data_subfolder("gcodes", gc_path)
db.delete_item("moonraker", "file_manager.gcode_path")
db_path = db_cfg.get("database_path", None)
default_db = pathlib.Path("~/.moonraker_database").expanduser()
if db_path is None and default_db.exists():
self._link_data_subfolder("database", default_db)
elif db_path is not None:
self._link_data_subfolder("database", db_path)
cfg_source.remove_option("database", "database_path")
# Link individual files
secrets_path = self.config["secrets"].get("secrets_path", None)
if secrets_path is not None: