From e0fb261c12fba68af51248dab0005a4fe30954e5 Mon Sep 17 00:00:00 2001 From: Arksine Date: Sun, 26 Jul 2020 08:59:13 -0400 Subject: [PATCH] file_manager: refactor upload logic Break gcode and config file uploads into their own functions. Add a 'primary_config' argument to config file uploads. If set to true, the upload will overwrite printer.cfg. Signed-off-by: Eric Callahan --- moonraker/plugins/file_manager.py | 130 ++++++++++++++++++------------ 1 file changed, 77 insertions(+), 53 deletions(-) diff --git a/moonraker/plugins/file_manager.py b/moonraker/plugins/file_manager.py index aa33ef8..2c97c53 100644 --- a/moonraker/plugins/file_manager.py +++ b/moonraker/plugins/file_manager.py @@ -294,23 +294,71 @@ class FileManager: return dict(new_list) async def process_file_upload(self, request): - start_print = print_ongoing = False - dir_path = "" # lookup root file path - root_args = request.arguments.get('root', ['gcodes']) - root = root_args[0].strip() - base_path = self.file_paths.get(root, None) - if base_path is None: - raise self.server.error(400, "Unknown root path") - # check relative path - path_args = request.arguments.get('path', []) - if path_args: - dir_path = path_args[0].decode().lstrip("/") - # check if print should be started after a "gcodes" upload + root = self._get_argument(request, 'root', "gcodes") if root == "gcodes": - print_args = request.arguments.get('print', []) - if print_args: - start_print = print_args[0].decode().lower() == "true" + result = await self._do_gcode_upload(request) + elif root == "config": + result = self._do_config_upload(request) + else: + raise self.server.error(400, "Unknown root path") + return result + + async def _do_gcode_upload(self, request): + start_print = print_ongoing = False + base_path = self.file_paths.get("gcodes", "") + if not base_path: + raise self.server.error(400, "Gcodes root not available") + start_print = self._get_argument(request, 'print', "false") == "true" + upload = self._get_upload_info(request, base_path) + # Verify that the operation can be done if attempting to upload a gcode + try: + print_ongoing = await self._handle_operation_check( + upload['full_path']) + except self.server.error as e: + if e.status_code == 403: + raise self.server.error( + 403, "File is loaded, upload not permitted") + else: + # Couldn't reach Klippy, so it should be safe + # to permit the upload but not start + start_print = False + # Don't start if another print is currently in progress + start_print = start_print and not print_ongoing + self._write_file(upload) + if start_print: + # Make a Klippy Request to "Start Print" + gcode_apis = self.server.lookup_plugin('gcode_apis') + try: + await gcode_apis.gcode_start_print( + request.path, 'POST', {'filename': upload['filename']}) + except self.server.error: + # Attempt to start print failed + start_print = False + self.server.notify_filelist_changed(upload['filename'], 'added') + return {'result': upload['filename'], 'print_started': start_print} + + def _do_config_upload(self, request): + req_arg = self._get_argument(request, 'primary_config', "false") + is_main_config = req_arg.lower() == "true" + cfg_base = "printer.cfg" if is_main_config else "config" + cfg_path = self.file_paths.get(cfg_base, None) + if cfg_path is None: + raise self.server.error( + "Printer configuration location on disk not set") + upload = self._get_upload_info(request, cfg_path) + self._write_file(upload) + return {'result': upload['filename']} + + def _get_argument(self, request, name, default=None): + args = request.arguments.get(name, None) + if args is not None: + return args[0].decode().strip() + return default + + def _get_upload_info(self, request, base_path): + # check relative path + dir_path = self._get_argument(request, 'path', "") # fetch the upload from the request if len(request.files) != 1: raise self.server.error( @@ -321,9 +369,7 @@ class FileManager: 400, "Bad Request, can only process a single file upload") upload = f_list[0] if os.path.isfile(base_path): - # If the root path points to a file, write directly to it. This - # is the case for printer.cfg - filename = root + filename = os.path.basename(base_path) full_path = base_path dir_path = "" else: @@ -331,46 +377,24 @@ class FileManager: if dir_path: filename = os.path.join(dir_path, filename) full_path = os.path.normpath(os.path.join(base_path, filename)) - # Validate the path. Don't allow uploads to a parent of the root - if not full_path.startswith(base_path): - raise self.server.error( - "Cannot write to path: %s" % (full_path)) - # Verify that the operation can be done if attempting to upload a gcode - if root == 'gcodes': - try: - print_ongoing = await self._handle_operation_check(full_path) - except self.server.error as e: - if e.status_code == 403: - raise self.server.error( - 403, "File is loaded, upload not permitted") - else: - # Couldn't reach Klippy, so it should be safe - # to permit the upload but not start - start_print = False + # Validate the path. Don't allow uploads to a parent of the root + if not full_path.startswith(base_path): + raise self.server.error( + "Cannot write to path: %s" % (full_path)) + return { + 'filename': filename, + 'body': upload['body'], + 'dir_path': dir_path, + 'full_path': full_path} - # Don't start if another print is currently in progress - start_print = start_print and not print_ongoing + def _write_file(self, upload): try: - if dir_path: - os.makedirs(os.path.dirname(full_path), exist_ok=True) - with open(full_path, 'wb') as fh: + if upload['dir_path']: + os.makedirs(os.path.dirname(upload['full_path']), exist_ok=True) + with open(upload['full_path'], 'wb') as fh: fh.write(upload['body']) except Exception: raise self.server.error(500, "Unable to save file") - if start_print: - # Make a Klippy Request to "Start Print" - gcode_apis = self.server.lookup_plugin('gcode_apis') - try: - await gcode_apis.gcode_start_print( - request.path, 'POST', {'filename': filename}) - except self.server.error: - # Attempt to start print failed - start_print = False - if root == 'gcodes': - self.server.notify_filelist_changed(filename, 'added') - return {'result': filename, 'print_started': start_print} - else: - return {'result': filename} def get_file_list(self, format_list=False, base='gcodes'): try: