file_manager: Add support for uploading and extracting ufp files

Credit to GitHub user cdkeito for creating a template from which this implementation was inspired.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2020-10-30 08:55:25 -04:00
parent b63d192df7
commit 0d515e4938
1 changed files with 54 additions and 7 deletions

View File

@ -6,7 +6,8 @@
import os import os
import sys import sys
import shutil import shutil
import time import io
import zipfile
import logging import logging
import json import json
from tornado.ioloop import IOLoop from tornado.ioloop import IOLoop
@ -311,10 +312,15 @@ class FileManager:
raise self.server.error("Gcodes root not available") raise self.server.error("Gcodes root not available")
start_print = self._get_argument(request, 'print', "false") == "true" start_print = self._get_argument(request, 'print', "false") == "true"
upload = self._get_upload_info(request, base_path) upload = self._get_upload_info(request, base_path)
fparts = os.path.splitext(upload['full_path'])
is_ufp = fparts[-1].lower() == ".ufp"
# Verify that the operation can be done if attempting to upload a gcode # Verify that the operation can be done if attempting to upload a gcode
try: try:
check_path = upload['full_path']
if is_ufp:
check_path = fparts[0] + ".gcode"
print_ongoing = await self._handle_operation_check( print_ongoing = await self._handle_operation_check(
upload['full_path']) check_path)
except self.server.error as e: except self.server.error as e:
if e.status_code == 403: if e.status_code == 403:
raise self.server.error( raise self.server.error(
@ -325,7 +331,7 @@ class FileManager:
start_print = False start_print = False
# Don't start if another print is currently in progress # Don't start if another print is currently in progress
start_print = start_print and not print_ongoing start_print = start_print and not print_ongoing
self._write_file(upload) self._write_file(upload, is_ufp)
if start_print: if start_print:
# Make a Klippy Request to "Start Print" # Make a Klippy Request to "Start Print"
klippy_apis = self.server.lookup_plugin('klippy_apis') klippy_apis = self.server.lookup_plugin('klippy_apis')
@ -384,15 +390,56 @@ class FileManager:
'dir_path': dir_path, 'dir_path': dir_path,
'full_path': full_path} 'full_path': full_path}
def _write_file(self, upload): def _write_file(self, upload, unzip_ufp=False):
try: try:
if upload['dir_path']: if upload['dir_path']:
os.makedirs(os.path.dirname(upload['full_path']), exist_ok=True) os.makedirs(os.path.dirname(
with open(upload['full_path'], 'wb') as fh: upload['full_path']), exist_ok=True)
fh.write(upload['body']) if unzip_ufp:
self._unzip_ufp(upload)
else:
with open(upload['full_path'], 'wb') as fh:
fh.write(upload['body'])
except Exception: except Exception:
raise self.server.error("Unable to save file", 500) raise self.server.error("Unable to save file", 500)
# UFP Extraction Implementation inspired by by GitHub user @cdkeito
def _unzip_ufp(self, upload):
base_name = os.path.splitext(
os.path.basename(upload['filename']))[0]
working_dir = os.path.dirname(upload['full_path'])
thumb_dir = os.path.join(working_dir, "thumbs")
ufp_bytes = io.BytesIO(upload['body'])
gc_bytes = img_bytes = None
with zipfile.ZipFile(ufp_bytes) as zf:
gc_bytes = zf.read("/3D/model.gcode")
try:
img_bytes = zf.read("/Metadata/thumbnail.png")
except Exception:
img_bytes = None
if gc_bytes is not None:
gc_name = base_name + ".gcode"
gc_path = os.path.join(working_dir, gc_name)
with open(gc_path, "wb") as gc_file:
gc_file.write(gc_bytes)
# update upload file name to extracted gcode file
upload['filename'] = os.path.join(
os.path.dirname(upload['filename']), gc_name)
else:
raise self.server.error(
f"UFP file {upload['filename']} does not "
"contain a gcode file")
if img_bytes is not None:
thumb_name = base_name + ".png"
thumb_path = os.path.join(thumb_dir, thumb_name)
try:
if not os.path.exists(thumb_dir):
os.mkdir(thumb_dir)
with open(thumb_path, "wb") as thumb_file:
thumb_file.write(img_bytes)
except Exception:
logging.exception("Unable to write Image")
def get_file_list(self, format_list=False, base='gcodes'): def get_file_list(self, format_list=False, base='gcodes'):
try: try:
filelist = self._update_file_list(base) filelist = self._update_file_list(base)