file_manager: support uploading to symlinks

Previously Moonraker disabled writing to symbolic links, as
doing so would overwrite the link.  Moonraker now resolves
the link, overwrites the file, and manually emits a websocket
notification.

Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-02-20 20:37:56 -05:00
parent 0f7b781f57
commit 7845390dd1
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
2 changed files with 16 additions and 7 deletions

View File

@ -794,8 +794,6 @@ class FileManager:
if unzip_ufp:
filename = os.path.splitext(filename)[0] + ".gcode"
dest_path = os.path.splitext(dest_path)[0] + ".gcode"
if os.path.islink(dest_path):
raise self.server.error(f"Cannot overwrite symlink: {dest_path}")
if os.path.isfile(dest_path) and not os.access(dest_path, os.W_OK):
raise self.server.error(f"File is read-only: {dest_path}")
return {
@ -806,7 +804,8 @@ class FileManager:
'tmp_file_path': upload_args['tmp_file_path'],
'start_print': start_print,
'unzip_ufp': unzip_ufp,
'ext': f_ext
'ext': f_ext,
"is_link": os.path.islink(dest_path)
}
async def _finish_gcode_upload(
@ -843,7 +842,8 @@ class FileManager:
queued = True
self.fs_observer.on_item_create("gcodes", upload_info["dest_path"])
result = dict(self._sched_changed_event(
"create_file", "gcodes", upload_info["dest_path"]
"create_file", "gcodes", upload_info["dest_path"],
immediate=upload_info["is_link"]
))
result.update({"print_started": started, "print_queued": queued})
return result
@ -855,7 +855,9 @@ class FileManager:
dest_path: str = upload_info["dest_path"]
root: str = upload_info["root"]
self.fs_observer.on_item_create(root, dest_path)
return self._sched_changed_event("create_file", root, dest_path)
return self._sched_changed_event(
"create_file", root, dest_path, immediate=upload_info["is_link"]
)
async def _process_uploaded_file(self,
upload_info: Dict[str, Any]
@ -877,8 +879,11 @@ class FileManager:
finfo = self.get_path_info(tmp_path, upload_info['root'])
finfo['ufp_path'] = tmp_path
else:
shutil.move(upload_info['tmp_file_path'],
upload_info['dest_path'])
dest_path = upload_info['dest_path']
if upload_info["is_link"]:
dest_path = os.path.realpath(dest_path)
shutil.move(
upload_info['tmp_file_path'], dest_path)
finfo = self.get_path_info(upload_info['dest_path'],
upload_info['root'])
except Exception:

View File

@ -1026,6 +1026,8 @@ def process_objects(file_path: str, slicer: BaseSlicer, name: str) -> bool:
except Exception as e:
log_to_stderr(f"Object processing failed: {e}")
return False
if os.path.islink(file_path):
file_path = os.path.realpath(file_path)
shutil.move(tmp_file, file_path)
return True
@ -1095,6 +1097,8 @@ def extract_ufp(ufp_path: str, dest_path: str) -> None:
if UFP_THUMB_PATH in zf.namelist():
tmp_thumb_path = zf.extract(
UFP_THUMB_PATH, path=tmp_dir_name)
if os.path.islink(dest_path):
dest_path = os.path.realpath(dest_path)
shutil.move(tmp_model_path, dest_path)
if tmp_thumb_path:
if not os.path.exists(dest_thumb_dir):