From 6021b39234bb4dcdb1815c6948fcf778f5024b4e Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Sat, 7 Jan 2023 08:02:20 -0500 Subject: [PATCH] file_manager: implement zip endpoint Provides an API for front ends to archive a list of files and/or folders into a single zipped file. Signed-off-by: Eric Callahan --- .../components/file_manager/file_manager.py | 100 ++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/moonraker/components/file_manager/file_manager.py b/moonraker/components/file_manager/file_manager.py index 71b375b..a47666a 100644 --- a/moonraker/components/file_manager/file_manager.py +++ b/moonraker/components/file_manager/file_manager.py @@ -13,6 +13,8 @@ import logging import json import tempfile import asyncio +import zipfile +import time from copy import deepcopy from inotify_simple import INotify from inotify_simple import flags as iFlags @@ -97,6 +99,8 @@ class FileManager: "/server/files/move", ['POST'], self._handle_file_move_copy) self.server.register_endpoint( "/server/files/copy", ['POST'], self._handle_file_move_copy) + self.server.register_endpoint( + "/server/files/zip", ['POST'], self._handle_zip_files) self.server.register_endpoint( "/server/files/delete_file", ['DELETE'], self._handle_file_delete, transports=["websocket"]) @@ -518,6 +522,102 @@ class FileManager: result['item']['path'] = self.get_relative_path(dest_root, full_dest) return result + async def _handle_zip_files( + self, web_request: WebRequest + ) -> Dict[str, Any]: + async with self.write_mutex: + store_only = web_request.get_boolean("store_only", False) + suffix = time.strftime("%Y%m%d-%H%M%S", time.localtime()) + dest: str = web_request.get_str( + "dest", f"config/collection-{suffix}.zip" + ) + dest_root, dest_str_path = self._convert_request_path(dest) + if dest_root not in self.full_access_roots: + raise self.server.error( + f"Destination Root '{dest_root}' is read-only" + ) + dest_path = pathlib.Path(dest_str_path) + self.check_reserved_path(dest_path, True) + if dest_path.is_dir(): + raise self.server.error( + f"Cannot create archive at '{dest_path}'. Path exists " + "as a directory." + ) + elif not dest_path.parent.exists(): + raise self.server.error( + f"Cannot create archive at '{dest_path}'. Parent " + "directory does not exist." + ) + items: Union[str, List[str]] = web_request.get("items") + if isinstance(items, str): + items = [ + item.strip() for item in items.split(",") if item.strip() + ] + if not items: + raise self.server.error( + "At least one file or directory must be specified" + ) + await self.event_loop.run_in_thread( + self._zip_files, items, dest_path, store_only + ) + rel_dest = dest_path.relative_to(self.file_paths[dest_root]) + return { + "destination": {"root": dest_root, "path": str(rel_dest)}, + "action": "zip_files" + } + + def _zip_files( + self, + item_list: List[str], + destination: StrOrPath, + store_only: bool = False + ) -> None: + if isinstance(destination, str): + destination = pathlib.Path(destination).expanduser().resolve() + tmpdir = pathlib.Path(tempfile.gettempdir()) + temp_dest = tmpdir.joinpath(destination.name) + processed: Set[Tuple[int, int]] = set() + cptype = zipfile.ZIP_STORED if store_only else zipfile.ZIP_DEFLATED + with zipfile.ZipFile(str(temp_dest), "w", compression=cptype) as zf: + for item in item_list: + root, str_path = self._convert_request_path(item) + root_path = pathlib.Path(self.file_paths[root]) + item_path = pathlib.Path(str_path) + self.check_reserved_path(item_path, False) + if not item_path.exists(): + raise self.server.error( + f"No file/directory exits at '{item}'" + ) + if item_path.is_file(): + st = item_path.stat() + ident = (st.st_dev, st.st_ino) + if ident in processed: + continue + processed.add(ident) + rel_path = item_path.relative_to(root_path.parent) + zf.write(str(item_path), arcname=str(rel_path)) + continue + elif not item_path.is_dir(): + raise self.server.error( + f"Item at path '{item}' is not a valid file or " + "directory" + ) + for child_path in item_path.iterdir(): + if child_path.is_file(): + if self.check_reserved_path(child_path, False, False): + continue + st = child_path.stat() + ident = (st.st_dev, st.st_ino) + if ident in processed: + continue + processed.add(ident) + rel_path = child_path.relative_to(root_path.parent) + try: + zf.write(str(child_path), arcname=str(rel_path)) + except PermissionError: + continue + shutil.move(str(temp_dest), str(destination)) + def _list_directory(self, path: str, root: str,