file_manager: allow extract_metadata to unzip ufp files
This guarantees that .ufps will not be unzipped in parallel and offloads a potential blocking operation to another process. Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
parent
eaf35ded0c
commit
c40c202220
|
@ -6,11 +6,9 @@
|
|||
import os
|
||||
import sys
|
||||
import shutil
|
||||
import zipfile
|
||||
import logging
|
||||
import json
|
||||
import tempfile
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from tornado.ioloop import IOLoop
|
||||
from tornado.locks import Event
|
||||
from inotify_simple import INotify
|
||||
|
@ -24,9 +22,6 @@ WATCH_FLAGS = iFlags.CREATE | iFlags.DELETE | iFlags.MODIFY \
|
|||
| iFlags.MOVED_TO | iFlags.MOVED_FROM | iFlags.ONLYDIR \
|
||||
| iFlags.CLOSE_WRITE
|
||||
|
||||
UFP_MODEL_PATH = "/3D/model.gcode"
|
||||
UFP_THUMB_PATH = "/Metadata/thumbnail.png"
|
||||
|
||||
class FileManager:
|
||||
def __init__(self, config):
|
||||
self.server = config.get_server()
|
||||
|
@ -393,14 +388,9 @@ class FileManager:
|
|||
start_print = False
|
||||
# Don't start if another print is currently in progress
|
||||
start_print = start_print and not print_ongoing
|
||||
ioloop = IOLoop.current()
|
||||
with ThreadPoolExecutor(max_workers=1) as tpe:
|
||||
await ioloop.run_in_executor(
|
||||
tpe, self._process_uploaded_file, upload_info)
|
||||
# Fetch Metadata
|
||||
finfo = self.get_path_info(upload_info['dest_path'])
|
||||
finfo = self._process_uploaded_file(upload_info)
|
||||
evt = self.gcode_metadata.parse_metadata(
|
||||
upload_info['filename'], finfo['size'], finfo['modified'])
|
||||
upload_info['filename'], finfo)
|
||||
await evt.wait()
|
||||
if start_print:
|
||||
# Make a Klippy Request to "Start Print"
|
||||
|
@ -416,10 +406,7 @@ class FileManager:
|
|||
}
|
||||
|
||||
async def _finish_standard_upload(self, upload_info):
|
||||
ioloop = IOLoop.current()
|
||||
with ThreadPoolExecutor(max_workers=1) as tpe:
|
||||
await ioloop.run_in_executor(
|
||||
tpe, self._process_uploaded_file, upload_info)
|
||||
self._process_uploaded_file(upload_info)
|
||||
return {'result': upload_info['filename']}
|
||||
|
||||
def _process_uploaded_file(self, upload_info):
|
||||
|
@ -428,45 +415,17 @@ class FileManager:
|
|||
os.makedirs(os.path.dirname(
|
||||
upload_info['dest_path']), exist_ok=True)
|
||||
if upload_info['unzip_ufp']:
|
||||
self._unzip_ufp(upload_info['tmp_file_path'],
|
||||
upload_info['dest_path'])
|
||||
tmp_path = upload_info['tmp_file_path']
|
||||
finfo = self.get_path_info(tmp_path)
|
||||
finfo['ufp_path'] = tmp_path
|
||||
else:
|
||||
shutil.move(upload_info['tmp_file_path'],
|
||||
upload_info['dest_path'])
|
||||
finfo = self.get_path_info(upload_info['dest_path'])
|
||||
except Exception:
|
||||
logging.exception("Upload Write Error")
|
||||
raise self.server.error("Unable to save file", 500)
|
||||
|
||||
# UFP Extraction Implementation inspired by GitHub user @cdkeito
|
||||
def _unzip_ufp(self, ufp_path, dest_path):
|
||||
thumb_name = os.path.splitext(
|
||||
os.path.basename(dest_path))[0] + ".png"
|
||||
dest_thumb_dir = os.path.join(os.path.dirname(dest_path), ".thumbs")
|
||||
dest_thumb_path = os.path.join(dest_thumb_dir, thumb_name)
|
||||
try:
|
||||
with tempfile.TemporaryDirectory() as tmp_dir_name:
|
||||
tmp_thumb_path = ""
|
||||
with zipfile.ZipFile(ufp_path) as zf:
|
||||
tmp_model_path = zf.extract(
|
||||
UFP_MODEL_PATH, path=tmp_dir_name)
|
||||
if UFP_THUMB_PATH in zf.namelist():
|
||||
tmp_thumb_path = zf.extract(
|
||||
UFP_THUMB_PATH, path=tmp_dir_name)
|
||||
shutil.move(tmp_model_path, dest_path)
|
||||
if tmp_thumb_path:
|
||||
if not os.path.exists(dest_thumb_dir):
|
||||
os.mkdir(dest_thumb_dir)
|
||||
shutil.move(tmp_thumb_path, dest_thumb_path)
|
||||
finally:
|
||||
try:
|
||||
os.remove(ufp_path)
|
||||
except Exception:
|
||||
logging.exception(f"Error removing ufp file: {ufp_path}")
|
||||
|
||||
def process_ufp_from_refresh(self, ufp_path):
|
||||
dest_path = os.path.splitext(ufp_path)[0] + ".gcode"
|
||||
self._unzip_ufp(ufp_path, dest_path)
|
||||
return dest_path
|
||||
return finfo
|
||||
|
||||
def get_file_list(self, root, list_format=False):
|
||||
# Use os.walk find files in sd path and subdirs
|
||||
|
@ -723,9 +682,6 @@ class INotifyHandler:
|
|||
ext = os.path.splitext(name)[-1].lower()
|
||||
if name[0] == "." or ext not in VALID_GCODE_EXTS:
|
||||
continue
|
||||
if ext == ".ufp":
|
||||
self.ioloop.spawn_callback(self._process_ufp, fpath)
|
||||
else:
|
||||
self._parse_gcode_metadata(fpath)
|
||||
if self.debug_enabled:
|
||||
debug_msg = f"Inotify Watches After Scan: {dir_path}"
|
||||
|
@ -744,14 +700,11 @@ class INotifyHandler:
|
|||
", metadata extraction aborted")
|
||||
return
|
||||
path_info = self.file_manager.get_path_info(file_path)
|
||||
self.gcode_metadata.parse_metadata(
|
||||
rel_path, path_info['size'], path_info['modified'], notify=True)
|
||||
|
||||
async def _process_ufp(self, file_path):
|
||||
with ThreadPoolExecutor(max_workers=1) as tpe:
|
||||
await self.ioloop.run_in_executor(
|
||||
tpe, self.file_manager.process_ufp_from_refresh,
|
||||
file_path)
|
||||
ext = os.path.splitext(file_path)[-1].lower()
|
||||
if ext == ".ufp":
|
||||
rel_path = os.path.splitext(rel_path)[0] + ".gcode"
|
||||
path_info['ufp_path'] = file_path
|
||||
self.gcode_metadata.parse_metadata(rel_path, path_info, notify=True)
|
||||
|
||||
def _handle_inotify_read(self, fd, events):
|
||||
if events & IOLoop.ERROR:
|
||||
|
@ -826,7 +779,7 @@ class INotifyHandler:
|
|||
"create_dir", root, child_path)
|
||||
|
||||
def _process_file_event(self, evt, root, child_path):
|
||||
ext = os.path.splitext(evt.name)[-1]
|
||||
ext = os.path.splitext(evt.name)[-1].lower()
|
||||
if root == "gcodes" and ext not in VALID_GCODE_EXTS:
|
||||
# Don't notify files with invalid gcode extensions
|
||||
return
|
||||
|
@ -857,10 +810,6 @@ class INotifyHandler:
|
|||
elif evt.mask & iFlags.MOVED_TO:
|
||||
logging.debug(f"Inotify file move to: {root}, {evt.name}")
|
||||
if root == "gcodes":
|
||||
if os.path.splitext(child_path)[-1] == ".ufp":
|
||||
self.ioloop.spawn_callback(
|
||||
self._process_ufp, child_path)
|
||||
return
|
||||
self._parse_gcode_metadata(child_path)
|
||||
pending_evt = self.pending_move_events.pop(evt.cookie, None)
|
||||
if pending_evt is not None:
|
||||
|
@ -892,11 +841,11 @@ class INotifyHandler:
|
|||
# Some other event, ignore it
|
||||
return
|
||||
if root == "gcodes":
|
||||
if os.path.splitext(child_path)[-1] == ".ufp":
|
||||
self.ioloop.spawn_callback(
|
||||
self._process_ufp, child_path)
|
||||
return
|
||||
self._parse_gcode_metadata(child_path)
|
||||
if ext == ".ufp":
|
||||
# Don't notify ufp creation in the gcodes directory,
|
||||
# it will be removed after it has been unzipped
|
||||
return
|
||||
self._notify_filelist_changed(action, root, child_path)
|
||||
|
||||
def _notify_filelist_changed(self, action, root, full_path,
|
||||
|
@ -966,9 +915,15 @@ class MetadataStorage:
|
|||
def __getitem__(self, key):
|
||||
return self.mddb[key]
|
||||
|
||||
def _has_valid_data(self, fname, fsize, modified):
|
||||
def _has_valid_data(self, fname, path_info):
|
||||
if path_info.get('ufp_path', None) is not None:
|
||||
# UFP files always need processing
|
||||
return False
|
||||
mdata = self.mddb.get(fname, {'size': "", 'modified': 0})
|
||||
return mdata['size'] == fsize and mdata['modified'] == modified
|
||||
for field in ['size', 'modified']:
|
||||
if mdata[field] != path_info.get(field, None):
|
||||
return False
|
||||
return True
|
||||
|
||||
def remove_directory_metadata(self, dir_name):
|
||||
for fname in list(self.mddb.keys()):
|
||||
|
@ -994,14 +949,14 @@ class MetadataStorage:
|
|||
except Exception:
|
||||
logging.debug(f"Error removing thumb at {thumb_path}")
|
||||
|
||||
def parse_metadata(self, fname, fsize, modified, notify=False):
|
||||
def parse_metadata(self, fname, path_info, notify=False):
|
||||
evt = Event()
|
||||
if fname in self.pending_requests or \
|
||||
self._has_valid_data(fname, fsize, modified):
|
||||
self._has_valid_data(fname, path_info):
|
||||
# request already pending or not necessary
|
||||
evt.set()
|
||||
return evt
|
||||
self.pending_requests[fname] = (fsize, modified, notify, evt)
|
||||
self.pending_requests[fname] = (path_info, notify, evt)
|
||||
if self.busy:
|
||||
return evt
|
||||
self.busy = True
|
||||
|
@ -1010,24 +965,26 @@ class MetadataStorage:
|
|||
|
||||
async def _process_metadata_update(self):
|
||||
while self.pending_requests:
|
||||
fname, (fsize, modified, notify, evt) = \
|
||||
fname, (path_info, notify, evt) = \
|
||||
self.pending_requests.popitem()
|
||||
if self._has_valid_data(fname, fsize, modified):
|
||||
if self._has_valid_data(fname, path_info):
|
||||
evt.set()
|
||||
continue
|
||||
ufp_path = path_info.get('ufp_path', None)
|
||||
retries = 3
|
||||
while retries:
|
||||
try:
|
||||
await self._run_extract_metadata(fname, notify)
|
||||
await self._run_extract_metadata(fname, ufp_path, notify)
|
||||
except Exception:
|
||||
logging.exception("Error running extract_metadata.py")
|
||||
retries -= 1
|
||||
else:
|
||||
break
|
||||
else:
|
||||
if ufp_path is None:
|
||||
self.mddb[fname] = {
|
||||
'size': fsize,
|
||||
'modified': modified,
|
||||
'size': path_info.get('size', 0),
|
||||
'modified': path_info.get('modified', 0),
|
||||
'print_start_time': None,
|
||||
'job_id': None
|
||||
}
|
||||
|
@ -1036,15 +993,20 @@ class MetadataStorage:
|
|||
evt.set()
|
||||
self.busy = False
|
||||
|
||||
async def _run_extract_metadata(self, filename, notify):
|
||||
async def _run_extract_metadata(self, filename, ufp_path, notify):
|
||||
# Escape single quotes in the file name so that it may be
|
||||
# properly loaded
|
||||
filename = filename.replace("\"", "\\\"")
|
||||
cmd = " ".join([sys.executable, METADATA_SCRIPT, "-p",
|
||||
self.gc_path, "-f", f"\"{filename}\""])
|
||||
timeout = 10.
|
||||
if ufp_path is not None and os.path.isfile(ufp_path):
|
||||
timeout = 300.
|
||||
ufp_path.replace("\"", "\\\"")
|
||||
cmd += f" -u \"{ufp_path}\""
|
||||
shell_command = self.server.lookup_component('shell_command')
|
||||
scmd = shell_command.build_shell_command(cmd, log_stderr=True)
|
||||
result = await scmd.run_with_response(timeout=10.)
|
||||
result = await scmd.run_with_response(timeout=timeout)
|
||||
try:
|
||||
decoded_resp = json.loads(result.strip())
|
||||
except Exception:
|
||||
|
|
Loading…
Reference in New Issue