file_manager: refactor upload logic
Break gcode and config file uploads into their own functions. Add a 'primary_config' argument to config file uploads. If set to true, the upload will overwrite printer.cfg. Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
parent
345229c928
commit
e0fb261c12
|
@ -294,23 +294,71 @@ class FileManager:
|
|||
return dict(new_list)
|
||||
|
||||
async def process_file_upload(self, request):
|
||||
start_print = print_ongoing = False
|
||||
dir_path = ""
|
||||
# lookup root file path
|
||||
root_args = request.arguments.get('root', ['gcodes'])
|
||||
root = root_args[0].strip()
|
||||
base_path = self.file_paths.get(root, None)
|
||||
if base_path is None:
|
||||
raise self.server.error(400, "Unknown root path")
|
||||
# check relative path
|
||||
path_args = request.arguments.get('path', [])
|
||||
if path_args:
|
||||
dir_path = path_args[0].decode().lstrip("/")
|
||||
# check if print should be started after a "gcodes" upload
|
||||
root = self._get_argument(request, 'root', "gcodes")
|
||||
if root == "gcodes":
|
||||
print_args = request.arguments.get('print', [])
|
||||
if print_args:
|
||||
start_print = print_args[0].decode().lower() == "true"
|
||||
result = await self._do_gcode_upload(request)
|
||||
elif root == "config":
|
||||
result = self._do_config_upload(request)
|
||||
else:
|
||||
raise self.server.error(400, "Unknown root path")
|
||||
return result
|
||||
|
||||
async def _do_gcode_upload(self, request):
|
||||
start_print = print_ongoing = False
|
||||
base_path = self.file_paths.get("gcodes", "")
|
||||
if not base_path:
|
||||
raise self.server.error(400, "Gcodes root not available")
|
||||
start_print = self._get_argument(request, 'print', "false") == "true"
|
||||
upload = self._get_upload_info(request, base_path)
|
||||
# Verify that the operation can be done if attempting to upload a gcode
|
||||
try:
|
||||
print_ongoing = await self._handle_operation_check(
|
||||
upload['full_path'])
|
||||
except self.server.error as e:
|
||||
if e.status_code == 403:
|
||||
raise self.server.error(
|
||||
403, "File is loaded, upload not permitted")
|
||||
else:
|
||||
# Couldn't reach Klippy, so it should be safe
|
||||
# to permit the upload but not start
|
||||
start_print = False
|
||||
# Don't start if another print is currently in progress
|
||||
start_print = start_print and not print_ongoing
|
||||
self._write_file(upload)
|
||||
if start_print:
|
||||
# Make a Klippy Request to "Start Print"
|
||||
gcode_apis = self.server.lookup_plugin('gcode_apis')
|
||||
try:
|
||||
await gcode_apis.gcode_start_print(
|
||||
request.path, 'POST', {'filename': upload['filename']})
|
||||
except self.server.error:
|
||||
# Attempt to start print failed
|
||||
start_print = False
|
||||
self.server.notify_filelist_changed(upload['filename'], 'added')
|
||||
return {'result': upload['filename'], 'print_started': start_print}
|
||||
|
||||
def _do_config_upload(self, request):
|
||||
req_arg = self._get_argument(request, 'primary_config', "false")
|
||||
is_main_config = req_arg.lower() == "true"
|
||||
cfg_base = "printer.cfg" if is_main_config else "config"
|
||||
cfg_path = self.file_paths.get(cfg_base, None)
|
||||
if cfg_path is None:
|
||||
raise self.server.error(
|
||||
"Printer configuration location on disk not set")
|
||||
upload = self._get_upload_info(request, cfg_path)
|
||||
self._write_file(upload)
|
||||
return {'result': upload['filename']}
|
||||
|
||||
def _get_argument(self, request, name, default=None):
|
||||
args = request.arguments.get(name, None)
|
||||
if args is not None:
|
||||
return args[0].decode().strip()
|
||||
return default
|
||||
|
||||
def _get_upload_info(self, request, base_path):
|
||||
# check relative path
|
||||
dir_path = self._get_argument(request, 'path', "")
|
||||
# fetch the upload from the request
|
||||
if len(request.files) != 1:
|
||||
raise self.server.error(
|
||||
|
@ -321,9 +369,7 @@ class FileManager:
|
|||
400, "Bad Request, can only process a single file upload")
|
||||
upload = f_list[0]
|
||||
if os.path.isfile(base_path):
|
||||
# If the root path points to a file, write directly to it. This
|
||||
# is the case for printer.cfg
|
||||
filename = root
|
||||
filename = os.path.basename(base_path)
|
||||
full_path = base_path
|
||||
dir_path = ""
|
||||
else:
|
||||
|
@ -331,46 +377,24 @@ class FileManager:
|
|||
if dir_path:
|
||||
filename = os.path.join(dir_path, filename)
|
||||
full_path = os.path.normpath(os.path.join(base_path, filename))
|
||||
# Validate the path. Don't allow uploads to a parent of the root
|
||||
if not full_path.startswith(base_path):
|
||||
raise self.server.error(
|
||||
"Cannot write to path: %s" % (full_path))
|
||||
# Verify that the operation can be done if attempting to upload a gcode
|
||||
if root == 'gcodes':
|
||||
try:
|
||||
print_ongoing = await self._handle_operation_check(full_path)
|
||||
except self.server.error as e:
|
||||
if e.status_code == 403:
|
||||
raise self.server.error(
|
||||
403, "File is loaded, upload not permitted")
|
||||
else:
|
||||
# Couldn't reach Klippy, so it should be safe
|
||||
# to permit the upload but not start
|
||||
start_print = False
|
||||
# Validate the path. Don't allow uploads to a parent of the root
|
||||
if not full_path.startswith(base_path):
|
||||
raise self.server.error(
|
||||
"Cannot write to path: %s" % (full_path))
|
||||
return {
|
||||
'filename': filename,
|
||||
'body': upload['body'],
|
||||
'dir_path': dir_path,
|
||||
'full_path': full_path}
|
||||
|
||||
# Don't start if another print is currently in progress
|
||||
start_print = start_print and not print_ongoing
|
||||
def _write_file(self, upload):
|
||||
try:
|
||||
if dir_path:
|
||||
os.makedirs(os.path.dirname(full_path), exist_ok=True)
|
||||
with open(full_path, 'wb') as fh:
|
||||
if upload['dir_path']:
|
||||
os.makedirs(os.path.dirname(upload['full_path']), exist_ok=True)
|
||||
with open(upload['full_path'], 'wb') as fh:
|
||||
fh.write(upload['body'])
|
||||
except Exception:
|
||||
raise self.server.error(500, "Unable to save file")
|
||||
if start_print:
|
||||
# Make a Klippy Request to "Start Print"
|
||||
gcode_apis = self.server.lookup_plugin('gcode_apis')
|
||||
try:
|
||||
await gcode_apis.gcode_start_print(
|
||||
request.path, 'POST', {'filename': filename})
|
||||
except self.server.error:
|
||||
# Attempt to start print failed
|
||||
start_print = False
|
||||
if root == 'gcodes':
|
||||
self.server.notify_filelist_changed(filename, 'added')
|
||||
return {'result': filename, 'print_started': start_print}
|
||||
else:
|
||||
return {'result': filename}
|
||||
|
||||
def get_file_list(self, format_list=False, base='gcodes'):
|
||||
try:
|
||||
|
|
Loading…
Reference in New Issue