From 4c914d7b4dd4f80a6db44ff607284474bbedde58 Mon Sep 17 00:00:00 2001 From: Arksine Date: Thu, 4 Mar 2021 20:32:18 -0500 Subject: [PATCH] file_manager: add support for streaming file uploads Signed-off-by: Eric Callahan --- moonraker/plugins/file_manager.py | 217 +++++++++++++++--------------- 1 file changed, 110 insertions(+), 107 deletions(-) diff --git a/moonraker/plugins/file_manager.py b/moonraker/plugins/file_manager.py index c30c790..4d65c99 100644 --- a/moonraker/plugins/file_manager.py +++ b/moonraker/plugins/file_manager.py @@ -6,10 +6,11 @@ import os import sys import shutil -import io import zipfile import logging import json +import tempfile +from concurrent.futures import ThreadPoolExecutor from tornado.ioloop import IOLoop, PeriodicCallback from tornado.locks import Event @@ -324,31 +325,76 @@ class FileManager: path_info = {'modified': modified, 'size': size} return path_info - async def process_file_upload(self, request): + def gen_temp_upload_path(self): + ioloop = IOLoop.current() + return os.path.join( + tempfile.gettempdir(), + f"moonraker.upload-{int(ioloop.time())}.mru") + + async def finalize_upload(self, form_args): # lookup root file path - root = self._get_argument(request, 'root', "gcodes") - if root == "gcodes": - result = await self._do_gcode_upload(request) - elif root in FULL_ACCESS_ROOTS: - result = self._do_standard_upload(request, root) - else: - raise self.server.error(f"Invalid root request: {root}") + try: + upload_info = self._parse_upload_args(form_args) + root = upload_info['root'] + if root == "gcodes": + result = await self._finish_gcode_upload(upload_info) + elif root in FULL_ACCESS_ROOTS: + result = await self._finish_standard_upload(upload_info) + else: + raise self.server.error(f"Invalid root request: {root}") + except Exception: + try: + os.remove(form_args['tmp_file_path']) + except Exception: + pass + raise return result - async def _do_gcode_upload(self, request): - start_print = print_ongoing = False - root_path = self.file_paths.get("gcodes", "") - if not root_path: - raise self.server.error("Gcodes root not available") - start_print = self._get_argument(request, 'print', "false") == "true" - upload = self._get_upload_info(request, root_path) - fparts = os.path.splitext(upload['full_path']) - is_ufp = fparts[-1].lower() == ".ufp" + def _parse_upload_args(self, upload_args): + if 'filename' not in upload_args: + raise self.server.error( + "No file name specifed in upload form") + # check relative path + root = upload_args.get('root', "gcodes").lower() + if root not in self.file_paths: + raise self.server.error(f"Root {root} not available") + root_path = self.file_paths[root] + dir_path = upload_args.get('path', "") + if os.path.isfile(root_path): + filename = os.path.basename(root_path) + dest_path = root_path + dir_path = "" + else: + filename = upload_args['filename'].strip().lstrip("/") + if dir_path: + filename = os.path.join(dir_path, filename) + dest_path = os.path.abspath(os.path.join(root_path, filename)) + # Validate the path. Don't allow uploads to a parent of the root + if not dest_path.startswith(root_path): + raise self.server.error( + f"Cannot write to path: {dest_path}") + start_print = upload_args.get('print', "false") == "true" + f_ext = os.path.splitext(dest_path)[-1].lower() + unzip_ufp = f_ext == ".ufp" and root == "gcodes" + if unzip_ufp: + filename = os.path.splitext(filename)[0] + ".gcode" + dest_path = os.path.splitext(dest_path)[0] + ".gcode" + return { + 'root': root, + 'filename': filename, + 'dir_path': dir_path, + 'dest_path': dest_path, + 'tmp_file_path': upload_args['tmp_file_path'], + 'start_print': start_print, + 'unzip_ufp': unzip_ufp + } + + async def _finish_gcode_upload(self, upload_info): + print_ongoing = False + start_print = upload_info['start_print'] # Verify that the operation can be done if attempting to upload a gcode try: - check_path = upload['full_path'] - if is_ufp: - check_path = fparts[0] + ".gcode" + check_path = upload_info['dest_path'] print_ongoing = await self._handle_operation_check( check_path) except self.server.error as e: @@ -361,112 +407,73 @@ class FileManager: start_print = False # Don't start if another print is currently in progress start_print = start_print and not print_ongoing - self._write_file(upload, is_ufp) + ioloop = IOLoop.current() + with ThreadPoolExecutor(max_workers=1) as tpe: + await ioloop.run_in_executor( + tpe, self._process_uploaded_file, upload_info) # Fetch Metadata - finfo = self._get_path_info(upload['full_path']) + finfo = self._get_path_info(upload_info['dest_path']) evt = self.gcode_metadata.parse_metadata( - upload['filename'], finfo['size'], finfo['modified']) + upload_info['filename'], finfo['size'], finfo['modified']) await evt.wait() if start_print: # Make a Klippy Request to "Start Print" klippy_apis = self.server.lookup_plugin('klippy_apis') try: - await klippy_apis.start_print(upload['filename']) + await klippy_apis.start_print(upload_info['filename']) except self.server.error: # Attempt to start print failed start_print = False self.notify_filelist_changed( - 'upload_file', upload['filename'], "gcodes") - return {'result': upload['filename'], 'print_started': start_print} - - def _do_standard_upload(self, request, root): - path = self.file_paths.get(root, None) - if path is None: - raise self.server.error(f"Unknown root path: {root}") - upload = self._get_upload_info(request, path) - self._write_file(upload) - self.notify_filelist_changed('upload_file', upload['filename'], root) - return {'result': upload['filename']} - - def _get_argument(self, request, name, default=None): - args = request.arguments.get(name, None) - if args is not None: - return args[0].decode().strip() - return default - - def _get_upload_info(self, request, root_path): - # check relative path - dir_path = self._get_argument(request, 'path', "") - # fetch the upload from the request - if len(request.files) != 1: - raise self.server.error( - "Bad Request, can only process a single file upload") - f_list = list(request.files.values())[0] - if len(f_list) != 1: - raise self.server.error( - "Bad Request, can only process a single file upload") - upload = f_list[0] - if os.path.isfile(root_path): - filename = os.path.basename(root_path) - full_path = root_path - dir_path = "" - else: - filename = upload['filename'].strip().lstrip("/") - if dir_path: - filename = os.path.join(dir_path, filename) - full_path = os.path.abspath(os.path.join(root_path, filename)) - # Validate the path. Don't allow uploads to a parent of the root - if not full_path.startswith(root_path): - raise self.server.error( - f"Cannot write to path: {full_path}") + 'upload_file', upload_info['filename'], "gcodes") return { - 'filename': filename, - 'body': upload['body'], - 'dir_path': dir_path, - 'full_path': full_path} + 'result': upload_info['filename'], + 'print_started': start_print + } - def _write_file(self, upload, unzip_ufp=False): + async def _finish_standard_upload(self, upload_info): + ioloop = IOLoop.current() + with ThreadPoolExecutor(max_workers=1) as tpe: + await ioloop.run_in_executor( + tpe, self._process_uploaded_file, upload_info) + self.notify_filelist_changed( + 'upload_file', upload_info['filename'], upload_info['root']) + return {'result': upload_info['filename']} + + def _process_uploaded_file(self, upload_info): try: - if upload['dir_path']: + if upload_info['dir_path']: os.makedirs(os.path.dirname( - upload['full_path']), exist_ok=True) - if unzip_ufp: - self._unzip_ufp(upload) + upload_info['dest_path']), exist_ok=True) + if upload_info['unzip_ufp']: + self._unzip_ufp(upload_info['tmp_file_path'], + upload_info['dest_path']) else: - with open(upload['full_path'], 'wb') as fh: - fh.write(upload['body']) + shutil.move(upload_info['tmp_file_path'], + upload_info['dest_path']) except Exception: raise self.server.error("Unable to save file", 500) - # UFP Extraction Implementation inspired by by GitHub user @cdkeito - def _unzip_ufp(self, upload): - base_name = os.path.splitext( - os.path.basename(upload['filename']))[0] - working_dir = os.path.dirname(upload['full_path']) - thumb_dir = os.path.join(working_dir, "thumbs") - ufp_bytes = io.BytesIO(upload['body']) + # UFP Extraction Implementation inspired by GitHub user @cdkeito + def _unzip_ufp(self, ufp_path, dest_path): gc_bytes = img_bytes = None - with zipfile.ZipFile(ufp_bytes) as zf: + with zipfile.ZipFile(ufp_path) as zf: gc_bytes = zf.read("/3D/model.gcode") try: img_bytes = zf.read("/Metadata/thumbnail.png") except Exception: img_bytes = None if gc_bytes is not None: - gc_name = base_name + ".gcode" - gc_path = os.path.join(working_dir, gc_name) - with open(gc_path, "wb") as gc_file: + with open(dest_path, "wb") as gc_file: gc_file.write(gc_bytes) - # update upload file name to extracted gcode file - upload['full_path'] = gc_path - upload['filename'] = os.path.join( - os.path.dirname(upload['filename']), gc_name) else: raise self.server.error( - f"UFP file {upload['filename']} does not " + f"UFP file {dest_path} does not " "contain a gcode file") if img_bytes is not None: - thumb_name = base_name + ".png" + thumb_name = os.path.splitext( + os.path.basename(dest_path))[0] + ".png" + thumb_dir = os.path.join(os.path.dirname(dest_path), "thumbs") thumb_path = os.path.join(thumb_dir, thumb_name) try: if not os.path.exists(thumb_dir): @@ -475,19 +482,15 @@ class FileManager: thumb_file.write(img_bytes) except Exception: logging.exception("Unable to write Image") + try: + os.remove(ufp_path) + except Exception: + logging.exception(f"Error removing ufp file: {ufp_path}") def _process_ufp_from_refresh(self, ufp_path): - filename = os.path.split(ufp_path)[-1] - ul = { - 'filename': filename, - 'full_path': ufp_path - } - with open(ufp_path, 'rb') as ufp: - body = ufp.read() - ul['body'] = body - self._unzip_ufp(ul) - os.remove(ufp_path) - return ul['full_path'] + dest_path = os.path.splitext(ufp_path)[0] + ".gcode" + self._unzip_ufp(ufp_path, dest_path) + return dest_path def get_file_list(self, root, list_format=False, notify=False): # Use os.walk find files in sd path and subdirs