diff --git a/moonraker/components/file_manager/metadata.py b/moonraker/components/file_manager/metadata.py index 13100b3..cb1a208 100644 --- a/moonraker/components/file_manager/metadata.py +++ b/moonraker/components/file_manager/metadata.py @@ -26,18 +26,12 @@ from typing import ( Optional, Dict, List, + Tuple, Type, ) if TYPE_CHECKING: pass -HAS_OBJECT_PROCESSING = True -try: - from preprocess_cancellation import preprocessor -except ImportError: - HAS_OBJECT_PROCESSING = False - - UFP_MODEL_PATH = "/3D/model.gcode" UFP_THUMB_PATH = "/Metadata/thumbnail.png" @@ -804,7 +798,12 @@ SUPPORTED_DATA = [ 'filament_weight_total', 'thumbnails'] -def process_objects(file_path: str) -> None: +def process_objects(file_path: str) -> bool: + try: + from preprocess_cancellation import preprocessor + except ImportError: + log_to_stderr("Module 'preprocess-cancellation' failed to load") + return False fname = os.path.basename(file_path) log_to_stderr(f"Performing Object Processing on file: {fname}") with tempfile.TemporaryDirectory() as tmp_dir_name: @@ -813,32 +812,24 @@ def process_objects(file_path: str) -> None: with open(tmp_file, 'w') as out_file: preprocessor(in_file, out_file) shutil.move(tmp_file, file_path) + return True - -def extract_metadata(file_path: str, - check_objects: bool - ) -> Dict[str, Any]: - metadata: Dict[str, Any] = {} - slicers = [s(file_path) for s in SUPPORTED_SLICERS] +def get_slicer(file_path: str) -> Tuple[BaseSlicer, Dict[str, str]]: header_data = footer_data = "" slicer: Optional[BaseSlicer] = None size = os.path.getsize(file_path) - metadata['size'] = size - metadata['modified'] = os.path.getmtime(file_path) - metadata['uuid'] = str(uuid.uuid4()) with open(file_path, 'r') as f: # read the default size, which should be enough to # identify the slicer header_data = f.read(READ_SIZE) - for s in slicers: - ident = s.check_identity(header_data) + for impl in SUPPORTED_SLICERS: + slicer = impl(file_path) + ident = slicer.check_identity(header_data) if ident is not None: - slicer = s - metadata.update(ident) break - if slicer is None: + else: slicer = UnknownSlicer(file_path) - metadata['slicer'] = "Unknown" + ident = slicer.check_identity(header_data) if size > READ_SIZE * 2: f.seek(size - READ_SIZE) footer_data = f.read() @@ -848,18 +839,27 @@ def extract_metadata(file_path: str, else: footer_data = header_data slicer.set_data(header_data, footer_data, size) - need_proc = check_objects and slicer.has_objects() - for key in SUPPORTED_DATA: - func = getattr(slicer, "parse_" + key) - result = func() - if result is not None: - metadata[key] = result - if need_proc: - process_objects(file_path) - # After processing the file has changed, update size and - # modified fields - metadata['size'] = os.path.getsize(file_path) - metadata['modified'] = os.path.getmtime(file_path) + if ident is None: + ident = {"slicer": "unknown"} + return slicer, ident + +def extract_metadata( + file_path: str, check_objects: bool +) -> Dict[str, Any]: + metadata: Dict[str, Any] = {} + slicer, ident = get_slicer(file_path) + if check_objects and slicer.has_objects(): + if process_objects(file_path): + slicer, ident = get_slicer(file_path) + metadata['size'] = os.path.getsize(file_path) + metadata['modified'] = os.path.getmtime(file_path) + metadata['uuid'] = str(uuid.uuid4()) + metadata.update(ident) + for key in SUPPORTED_DATA: + func = getattr(slicer, "parse_" + key) + result = func() + if result is not None: + metadata[key] = result return metadata def extract_ufp(ufp_path: str, dest_path: str) -> None: @@ -940,9 +940,7 @@ if __name__ == "__main__": "-o", "--check-objects", dest='check_objects', action='store_true', help="process gcode file for exclude opbject functionality") args = parser.parse_args() - if not HAS_OBJECT_PROCESSING: - log_to_stderr("Module 'preprocess-cancellation' failed to load") - check_objects = args.check_objects and HAS_OBJECT_PROCESSING + check_objects = args.check_objects enabled_msg = "enabled" if check_objects else "disabled" log_to_stderr(f"Object Processing is {enabled_msg}") main(args.path, args.filename, args.ufp, check_objects)