metadata: read metadata after object processing

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2022-04-21 10:28:45 -04:00
parent 6f2ce72b4c
commit 11beaa7076
No known key found for this signature in database
GPG Key ID: 7027245FBBDDF59A
1 changed files with 36 additions and 38 deletions

View File

@ -26,18 +26,12 @@ from typing import (
Optional, Optional,
Dict, Dict,
List, List,
Tuple,
Type, Type,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
pass pass
HAS_OBJECT_PROCESSING = True
try:
from preprocess_cancellation import preprocessor
except ImportError:
HAS_OBJECT_PROCESSING = False
UFP_MODEL_PATH = "/3D/model.gcode" UFP_MODEL_PATH = "/3D/model.gcode"
UFP_THUMB_PATH = "/Metadata/thumbnail.png" UFP_THUMB_PATH = "/Metadata/thumbnail.png"
@ -804,7 +798,12 @@ SUPPORTED_DATA = [
'filament_weight_total', 'filament_weight_total',
'thumbnails'] 'thumbnails']
def process_objects(file_path: str) -> None: def process_objects(file_path: str) -> bool:
try:
from preprocess_cancellation import preprocessor
except ImportError:
log_to_stderr("Module 'preprocess-cancellation' failed to load")
return False
fname = os.path.basename(file_path) fname = os.path.basename(file_path)
log_to_stderr(f"Performing Object Processing on file: {fname}") log_to_stderr(f"Performing Object Processing on file: {fname}")
with tempfile.TemporaryDirectory() as tmp_dir_name: with tempfile.TemporaryDirectory() as tmp_dir_name:
@ -813,32 +812,24 @@ def process_objects(file_path: str) -> None:
with open(tmp_file, 'w') as out_file: with open(tmp_file, 'w') as out_file:
preprocessor(in_file, out_file) preprocessor(in_file, out_file)
shutil.move(tmp_file, file_path) shutil.move(tmp_file, file_path)
return True
def get_slicer(file_path: str) -> Tuple[BaseSlicer, Dict[str, str]]:
def extract_metadata(file_path: str,
check_objects: bool
) -> Dict[str, Any]:
metadata: Dict[str, Any] = {}
slicers = [s(file_path) for s in SUPPORTED_SLICERS]
header_data = footer_data = "" header_data = footer_data = ""
slicer: Optional[BaseSlicer] = None slicer: Optional[BaseSlicer] = None
size = os.path.getsize(file_path) size = os.path.getsize(file_path)
metadata['size'] = size
metadata['modified'] = os.path.getmtime(file_path)
metadata['uuid'] = str(uuid.uuid4())
with open(file_path, 'r') as f: with open(file_path, 'r') as f:
# read the default size, which should be enough to # read the default size, which should be enough to
# identify the slicer # identify the slicer
header_data = f.read(READ_SIZE) header_data = f.read(READ_SIZE)
for s in slicers: for impl in SUPPORTED_SLICERS:
ident = s.check_identity(header_data) slicer = impl(file_path)
ident = slicer.check_identity(header_data)
if ident is not None: if ident is not None:
slicer = s
metadata.update(ident)
break break
if slicer is None: else:
slicer = UnknownSlicer(file_path) slicer = UnknownSlicer(file_path)
metadata['slicer'] = "Unknown" ident = slicer.check_identity(header_data)
if size > READ_SIZE * 2: if size > READ_SIZE * 2:
f.seek(size - READ_SIZE) f.seek(size - READ_SIZE)
footer_data = f.read() footer_data = f.read()
@ -848,18 +839,27 @@ def extract_metadata(file_path: str,
else: else:
footer_data = header_data footer_data = header_data
slicer.set_data(header_data, footer_data, size) slicer.set_data(header_data, footer_data, size)
need_proc = check_objects and slicer.has_objects() if ident is None:
for key in SUPPORTED_DATA: ident = {"slicer": "unknown"}
func = getattr(slicer, "parse_" + key) return slicer, ident
result = func()
if result is not None: def extract_metadata(
metadata[key] = result file_path: str, check_objects: bool
if need_proc: ) -> Dict[str, Any]:
process_objects(file_path) metadata: Dict[str, Any] = {}
# After processing the file has changed, update size and slicer, ident = get_slicer(file_path)
# modified fields if check_objects and slicer.has_objects():
metadata['size'] = os.path.getsize(file_path) if process_objects(file_path):
metadata['modified'] = os.path.getmtime(file_path) slicer, ident = get_slicer(file_path)
metadata['size'] = os.path.getsize(file_path)
metadata['modified'] = os.path.getmtime(file_path)
metadata['uuid'] = str(uuid.uuid4())
metadata.update(ident)
for key in SUPPORTED_DATA:
func = getattr(slicer, "parse_" + key)
result = func()
if result is not None:
metadata[key] = result
return metadata return metadata
def extract_ufp(ufp_path: str, dest_path: str) -> None: def extract_ufp(ufp_path: str, dest_path: str) -> None:
@ -940,9 +940,7 @@ if __name__ == "__main__":
"-o", "--check-objects", dest='check_objects', action='store_true', "-o", "--check-objects", dest='check_objects', action='store_true',
help="process gcode file for exclude opbject functionality") help="process gcode file for exclude opbject functionality")
args = parser.parse_args() args = parser.parse_args()
if not HAS_OBJECT_PROCESSING: check_objects = args.check_objects
log_to_stderr("Module 'preprocess-cancellation' failed to load")
check_objects = args.check_objects and HAS_OBJECT_PROCESSING
enabled_msg = "enabled" if check_objects else "disabled" enabled_msg = "enabled" if check_objects else "disabled"
log_to_stderr(f"Object Processing is {enabled_msg}") log_to_stderr(f"Object Processing is {enabled_msg}")
main(args.path, args.filename, args.ufp, check_objects) main(args.path, args.filename, args.ufp, check_objects)