file_manager: add support for streaming file uploads

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2021-03-04 20:32:18 -05:00
parent f1edaa1f61
commit 4c914d7b4d
1 changed files with 110 additions and 107 deletions

View File

@ -6,10 +6,11 @@
import os
import sys
import shutil
import io
import zipfile
import logging
import json
import tempfile
from concurrent.futures import ThreadPoolExecutor
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.locks import Event
@ -324,31 +325,76 @@ class FileManager:
path_info = {'modified': modified, 'size': size}
return path_info
async def process_file_upload(self, request):
def gen_temp_upload_path(self):
ioloop = IOLoop.current()
return os.path.join(
tempfile.gettempdir(),
f"moonraker.upload-{int(ioloop.time())}.mru")
async def finalize_upload(self, form_args):
# lookup root file path
root = self._get_argument(request, 'root', "gcodes")
if root == "gcodes":
result = await self._do_gcode_upload(request)
elif root in FULL_ACCESS_ROOTS:
result = self._do_standard_upload(request, root)
else:
raise self.server.error(f"Invalid root request: {root}")
try:
upload_info = self._parse_upload_args(form_args)
root = upload_info['root']
if root == "gcodes":
result = await self._finish_gcode_upload(upload_info)
elif root in FULL_ACCESS_ROOTS:
result = await self._finish_standard_upload(upload_info)
else:
raise self.server.error(f"Invalid root request: {root}")
except Exception:
try:
os.remove(form_args['tmp_file_path'])
except Exception:
pass
raise
return result
async def _do_gcode_upload(self, request):
start_print = print_ongoing = False
root_path = self.file_paths.get("gcodes", "")
if not root_path:
raise self.server.error("Gcodes root not available")
start_print = self._get_argument(request, 'print', "false") == "true"
upload = self._get_upload_info(request, root_path)
fparts = os.path.splitext(upload['full_path'])
is_ufp = fparts[-1].lower() == ".ufp"
def _parse_upload_args(self, upload_args):
if 'filename' not in upload_args:
raise self.server.error(
"No file name specifed in upload form")
# check relative path
root = upload_args.get('root', "gcodes").lower()
if root not in self.file_paths:
raise self.server.error(f"Root {root} not available")
root_path = self.file_paths[root]
dir_path = upload_args.get('path', "")
if os.path.isfile(root_path):
filename = os.path.basename(root_path)
dest_path = root_path
dir_path = ""
else:
filename = upload_args['filename'].strip().lstrip("/")
if dir_path:
filename = os.path.join(dir_path, filename)
dest_path = os.path.abspath(os.path.join(root_path, filename))
# Validate the path. Don't allow uploads to a parent of the root
if not dest_path.startswith(root_path):
raise self.server.error(
f"Cannot write to path: {dest_path}")
start_print = upload_args.get('print', "false") == "true"
f_ext = os.path.splitext(dest_path)[-1].lower()
unzip_ufp = f_ext == ".ufp" and root == "gcodes"
if unzip_ufp:
filename = os.path.splitext(filename)[0] + ".gcode"
dest_path = os.path.splitext(dest_path)[0] + ".gcode"
return {
'root': root,
'filename': filename,
'dir_path': dir_path,
'dest_path': dest_path,
'tmp_file_path': upload_args['tmp_file_path'],
'start_print': start_print,
'unzip_ufp': unzip_ufp
}
async def _finish_gcode_upload(self, upload_info):
print_ongoing = False
start_print = upload_info['start_print']
# Verify that the operation can be done if attempting to upload a gcode
try:
check_path = upload['full_path']
if is_ufp:
check_path = fparts[0] + ".gcode"
check_path = upload_info['dest_path']
print_ongoing = await self._handle_operation_check(
check_path)
except self.server.error as e:
@ -361,112 +407,73 @@ class FileManager:
start_print = False
# Don't start if another print is currently in progress
start_print = start_print and not print_ongoing
self._write_file(upload, is_ufp)
ioloop = IOLoop.current()
with ThreadPoolExecutor(max_workers=1) as tpe:
await ioloop.run_in_executor(
tpe, self._process_uploaded_file, upload_info)
# Fetch Metadata
finfo = self._get_path_info(upload['full_path'])
finfo = self._get_path_info(upload_info['dest_path'])
evt = self.gcode_metadata.parse_metadata(
upload['filename'], finfo['size'], finfo['modified'])
upload_info['filename'], finfo['size'], finfo['modified'])
await evt.wait()
if start_print:
# Make a Klippy Request to "Start Print"
klippy_apis = self.server.lookup_plugin('klippy_apis')
try:
await klippy_apis.start_print(upload['filename'])
await klippy_apis.start_print(upload_info['filename'])
except self.server.error:
# Attempt to start print failed
start_print = False
self.notify_filelist_changed(
'upload_file', upload['filename'], "gcodes")
return {'result': upload['filename'], 'print_started': start_print}
def _do_standard_upload(self, request, root):
path = self.file_paths.get(root, None)
if path is None:
raise self.server.error(f"Unknown root path: {root}")
upload = self._get_upload_info(request, path)
self._write_file(upload)
self.notify_filelist_changed('upload_file', upload['filename'], root)
return {'result': upload['filename']}
def _get_argument(self, request, name, default=None):
args = request.arguments.get(name, None)
if args is not None:
return args[0].decode().strip()
return default
def _get_upload_info(self, request, root_path):
# check relative path
dir_path = self._get_argument(request, 'path', "")
# fetch the upload from the request
if len(request.files) != 1:
raise self.server.error(
"Bad Request, can only process a single file upload")
f_list = list(request.files.values())[0]
if len(f_list) != 1:
raise self.server.error(
"Bad Request, can only process a single file upload")
upload = f_list[0]
if os.path.isfile(root_path):
filename = os.path.basename(root_path)
full_path = root_path
dir_path = ""
else:
filename = upload['filename'].strip().lstrip("/")
if dir_path:
filename = os.path.join(dir_path, filename)
full_path = os.path.abspath(os.path.join(root_path, filename))
# Validate the path. Don't allow uploads to a parent of the root
if not full_path.startswith(root_path):
raise self.server.error(
f"Cannot write to path: {full_path}")
'upload_file', upload_info['filename'], "gcodes")
return {
'filename': filename,
'body': upload['body'],
'dir_path': dir_path,
'full_path': full_path}
'result': upload_info['filename'],
'print_started': start_print
}
def _write_file(self, upload, unzip_ufp=False):
async def _finish_standard_upload(self, upload_info):
ioloop = IOLoop.current()
with ThreadPoolExecutor(max_workers=1) as tpe:
await ioloop.run_in_executor(
tpe, self._process_uploaded_file, upload_info)
self.notify_filelist_changed(
'upload_file', upload_info['filename'], upload_info['root'])
return {'result': upload_info['filename']}
def _process_uploaded_file(self, upload_info):
try:
if upload['dir_path']:
if upload_info['dir_path']:
os.makedirs(os.path.dirname(
upload['full_path']), exist_ok=True)
if unzip_ufp:
self._unzip_ufp(upload)
upload_info['dest_path']), exist_ok=True)
if upload_info['unzip_ufp']:
self._unzip_ufp(upload_info['tmp_file_path'],
upload_info['dest_path'])
else:
with open(upload['full_path'], 'wb') as fh:
fh.write(upload['body'])
shutil.move(upload_info['tmp_file_path'],
upload_info['dest_path'])
except Exception:
raise self.server.error("Unable to save file", 500)
# UFP Extraction Implementation inspired by by GitHub user @cdkeito
def _unzip_ufp(self, upload):
base_name = os.path.splitext(
os.path.basename(upload['filename']))[0]
working_dir = os.path.dirname(upload['full_path'])
thumb_dir = os.path.join(working_dir, "thumbs")
ufp_bytes = io.BytesIO(upload['body'])
# UFP Extraction Implementation inspired by GitHub user @cdkeito
def _unzip_ufp(self, ufp_path, dest_path):
gc_bytes = img_bytes = None
with zipfile.ZipFile(ufp_bytes) as zf:
with zipfile.ZipFile(ufp_path) as zf:
gc_bytes = zf.read("/3D/model.gcode")
try:
img_bytes = zf.read("/Metadata/thumbnail.png")
except Exception:
img_bytes = None
if gc_bytes is not None:
gc_name = base_name + ".gcode"
gc_path = os.path.join(working_dir, gc_name)
with open(gc_path, "wb") as gc_file:
with open(dest_path, "wb") as gc_file:
gc_file.write(gc_bytes)
# update upload file name to extracted gcode file
upload['full_path'] = gc_path
upload['filename'] = os.path.join(
os.path.dirname(upload['filename']), gc_name)
else:
raise self.server.error(
f"UFP file {upload['filename']} does not "
f"UFP file {dest_path} does not "
"contain a gcode file")
if img_bytes is not None:
thumb_name = base_name + ".png"
thumb_name = os.path.splitext(
os.path.basename(dest_path))[0] + ".png"
thumb_dir = os.path.join(os.path.dirname(dest_path), "thumbs")
thumb_path = os.path.join(thumb_dir, thumb_name)
try:
if not os.path.exists(thumb_dir):
@ -475,19 +482,15 @@ class FileManager:
thumb_file.write(img_bytes)
except Exception:
logging.exception("Unable to write Image")
try:
os.remove(ufp_path)
except Exception:
logging.exception(f"Error removing ufp file: {ufp_path}")
def _process_ufp_from_refresh(self, ufp_path):
filename = os.path.split(ufp_path)[-1]
ul = {
'filename': filename,
'full_path': ufp_path
}
with open(ufp_path, 'rb') as ufp:
body = ufp.read()
ul['body'] = body
self._unzip_ufp(ul)
os.remove(ufp_path)
return ul['full_path']
dest_path = os.path.splitext(ufp_path)[0] + ".gcode"
self._unzip_ufp(ufp_path, dest_path)
return dest_path
def get_file_list(self, root, list_format=False, notify=False):
# Use os.walk find files in sd path and subdirs