file_manager: implement zip endpoint

Provides an API for front ends to archive a list of files and/or
folders into a single zipped file.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-01-07 08:02:20 -05:00
parent 53eda78b11
commit 6021b39234
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
1 changed files with 100 additions and 0 deletions

View File

@ -13,6 +13,8 @@ import logging
import json
import tempfile
import asyncio
import zipfile
import time
from copy import deepcopy
from inotify_simple import INotify
from inotify_simple import flags as iFlags
@ -97,6 +99,8 @@ class FileManager:
"/server/files/move", ['POST'], self._handle_file_move_copy)
self.server.register_endpoint(
"/server/files/copy", ['POST'], self._handle_file_move_copy)
self.server.register_endpoint(
"/server/files/zip", ['POST'], self._handle_zip_files)
self.server.register_endpoint(
"/server/files/delete_file", ['DELETE'], self._handle_file_delete,
transports=["websocket"])
@ -518,6 +522,102 @@ class FileManager:
result['item']['path'] = self.get_relative_path(dest_root, full_dest)
return result
async def _handle_zip_files(
self, web_request: WebRequest
) -> Dict[str, Any]:
async with self.write_mutex:
store_only = web_request.get_boolean("store_only", False)
suffix = time.strftime("%Y%m%d-%H%M%S", time.localtime())
dest: str = web_request.get_str(
"dest", f"config/collection-{suffix}.zip"
)
dest_root, dest_str_path = self._convert_request_path(dest)
if dest_root not in self.full_access_roots:
raise self.server.error(
f"Destination Root '{dest_root}' is read-only"
)
dest_path = pathlib.Path(dest_str_path)
self.check_reserved_path(dest_path, True)
if dest_path.is_dir():
raise self.server.error(
f"Cannot create archive at '{dest_path}'. Path exists "
"as a directory."
)
elif not dest_path.parent.exists():
raise self.server.error(
f"Cannot create archive at '{dest_path}'. Parent "
"directory does not exist."
)
items: Union[str, List[str]] = web_request.get("items")
if isinstance(items, str):
items = [
item.strip() for item in items.split(",") if item.strip()
]
if not items:
raise self.server.error(
"At least one file or directory must be specified"
)
await self.event_loop.run_in_thread(
self._zip_files, items, dest_path, store_only
)
rel_dest = dest_path.relative_to(self.file_paths[dest_root])
return {
"destination": {"root": dest_root, "path": str(rel_dest)},
"action": "zip_files"
}
def _zip_files(
self,
item_list: List[str],
destination: StrOrPath,
store_only: bool = False
) -> None:
if isinstance(destination, str):
destination = pathlib.Path(destination).expanduser().resolve()
tmpdir = pathlib.Path(tempfile.gettempdir())
temp_dest = tmpdir.joinpath(destination.name)
processed: Set[Tuple[int, int]] = set()
cptype = zipfile.ZIP_STORED if store_only else zipfile.ZIP_DEFLATED
with zipfile.ZipFile(str(temp_dest), "w", compression=cptype) as zf:
for item in item_list:
root, str_path = self._convert_request_path(item)
root_path = pathlib.Path(self.file_paths[root])
item_path = pathlib.Path(str_path)
self.check_reserved_path(item_path, False)
if not item_path.exists():
raise self.server.error(
f"No file/directory exits at '{item}'"
)
if item_path.is_file():
st = item_path.stat()
ident = (st.st_dev, st.st_ino)
if ident in processed:
continue
processed.add(ident)
rel_path = item_path.relative_to(root_path.parent)
zf.write(str(item_path), arcname=str(rel_path))
continue
elif not item_path.is_dir():
raise self.server.error(
f"Item at path '{item}' is not a valid file or "
"directory"
)
for child_path in item_path.iterdir():
if child_path.is_file():
if self.check_reserved_path(child_path, False, False):
continue
st = child_path.stat()
ident = (st.st_dev, st.st_ino)
if ident in processed:
continue
processed.add(ident)
rel_path = child_path.relative_to(root_path.parent)
try:
zf.write(str(child_path), arcname=str(rel_path))
except PermissionError:
continue
shutil.move(str(temp_dest), str(destination))
def _list_directory(self,
path: str,
root: str,