diff --git a/moonraker/components/file_manager.py b/moonraker/components/file_manager.py index 1d88ee0..8caadf4 100644 --- a/moonraker/components/file_manager.py +++ b/moonraker/components/file_manager.py @@ -3,6 +3,8 @@ # Copyright (C) 2020 Eric Callahan # # This file may be distributed under the terms of the GNU GPLv3 license. + +from __future__ import annotations import os import sys import shutil @@ -16,6 +18,34 @@ from tornado.locks import Event, Lock, Condition from inotify_simple import INotify from inotify_simple import flags as iFlags +# Annotation imports +from typing import ( + TYPE_CHECKING, + Any, + Tuple, + Optional, + Union, + Dict, + List, + Set, + Coroutine, + Callable, + TypeVar, + cast, +) +if TYPE_CHECKING: + from inotify_simple import Event as InotifyEvent + from moonraker import Server + from confighelper import ConfigHelper + from websockets import WebRequest + from . import database + from . import klippy_apis + from . import shell_command + DBComp = database.MoonrakerDatabase + APIComp = klippy_apis.KlippyAPI + SCMDComp = shell_command.ShellCommandFactory + _T = TypeVar("_T") + VALID_GCODE_EXTS = ['.gcode', '.g', '.gco', '.ufp', '.nc'] FULL_ACCESS_ROOTS = ["gcodes", "config"] METADATA_SCRIPT = os.path.abspath(os.path.join( @@ -25,17 +55,18 @@ WATCH_FLAGS = iFlags.CREATE | iFlags.DELETE | iFlags.MODIFY \ | iFlags.CLOSE_WRITE class FileManager: - def __init__(self, config): + def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() - self.file_paths = {} - database = self.server.load_component(config, "database") - gc_path = database.get_item("moonraker", "file_manager.gcode_path", "") - self.gcode_metadata = MetadataStorage(self.server, gc_path, database) + self.file_paths: Dict[str, str] = {} + db: DBComp = self.server.load_component(config, "database") + gc_path: str = db.get_item( + "moonraker", "file_manager.gcode_path", "") + self.gcode_metadata = MetadataStorage(self.server, gc_path, db) self.inotify_handler = INotifyHandler(config, self, self.gcode_metadata) self.write_mutex = Lock() - self.notify_sync_lock = None - self.fixed_path_args = {} + self.notify_sync_lock: Optional[NotifySyncLock] = None + self.fixed_path_args: Dict[str, Any] = {} # Register file management endpoints self.server.register_endpoint( @@ -73,11 +104,12 @@ class FileManager: if gc_path: self.register_directory('gcodes', gc_path) - def _update_fixed_paths(self): + def _update_fixed_paths(self) -> None: kinfo = self.server.get_klippy_info() - paths = {k: kinfo.get(k) for k in - ['klipper_path', 'python_path', - 'log_file', 'config_file']} + paths: Dict[str, Any] = \ + {k: kinfo.get(k) for k in + ['klipper_path', 'python_path', + 'log_file', 'config_file']} if paths == self.fixed_path_args: # No change in fixed paths return @@ -96,11 +128,11 @@ class FileManager: # Register log path log_file = paths.get('log_file') if log_file is not None: - log_path = os.path.abspath(os.path.expanduser(log_file)) + log_path: str = os.path.abspath(os.path.expanduser(log_file)) self.server.register_static_file_handler( "klippy.log", log_path, force=True) - def register_directory(self, root, path): + def register_directory(self, root: str, path: Optional[str]) -> bool: if path is None: return False path = os.path.abspath(os.path.expanduser(path)) @@ -123,9 +155,9 @@ class FileManager: self.file_paths[root] = path self.server.register_static_file_handler(root, path) if root == "gcodes": - database = self.server.lookup_component( - "database").wrap_namespace("moonraker") - database["file_manager.gcode_path"] = path + db: DBComp = self.server.lookup_component("database") + moon_db = db.wrap_namespace("moonraker") + moon_db["file_manager.gcode_path"] = path # scan for metadata changes self.gcode_metadata.update_gcode_path(path) if root in FULL_ACCESS_ROOTS: @@ -137,38 +169,44 @@ class FileManager: "root_update", root, path) return True - def get_sd_directory(self): + def get_sd_directory(self) -> str: return self.file_paths.get('gcodes', "") - def get_registered_dirs(self): + def get_registered_dirs(self) -> List[str]: return list(self.file_paths.keys()) - def get_fixed_path_args(self): + def get_fixed_path_args(self) -> Dict[str, Any]: return dict(self.fixed_path_args) - def get_relative_path(self, root, full_path): + def get_relative_path(self, root: str, full_path: str) -> str: root_dir = self.file_paths.get(root, None) if root_dir is None or not full_path.startswith(root_dir): return "" return os.path.relpath(full_path, start=root_dir) - def check_file_exists(self, root, filename): + def check_file_exists(self, root: str, filename: str) -> bool: root_dir = self.file_paths.get(root, "") file_path = os.path.join(root_dir, filename) return os.path.exists(file_path) - def sync_inotify_event(self, path): + def sync_inotify_event(self, path: str) -> Optional[NotifySyncLock]: if self.notify_sync_lock is None or \ not self.notify_sync_lock.check_need_sync(path): return None return self.notify_sync_lock - async def _handle_filelist_request(self, web_request): + async def _handle_filelist_request(self, + web_request: WebRequest + ) -> List[Dict[str, Any]]: root = web_request.get_str('root', "gcodes") - return self.get_file_list(root, list_format=True) + flist = self.get_file_list(root, list_format=True) + return cast(List[Dict[str, Any]], flist) - async def _handle_metadata_request(self, web_request): - requested_file = web_request.get_str('filename') + async def _handle_metadata_request(self, + web_request: WebRequest + ) -> Dict[str, Any]: + requested_file: str = web_request.get_str('filename') + metadata: Optional[Dict[str, Any]] metadata = self.gcode_metadata.get(requested_file, None) if metadata is None: raise self.server.error( @@ -176,7 +214,9 @@ class FileManager: metadata['filename'] = requested_file return metadata - async def _handle_directory_request(self, web_request): + async def _handle_directory_request(self, + web_request: WebRequest + ) -> Dict[str, Any]: directory = web_request.get_str('path', "gcodes") root, dir_path = self._convert_request_path(directory) action = web_request.get_action() @@ -230,16 +270,17 @@ class FileManager: raise self.server.error("Operation Not Supported", 405) return result - async def _handle_operation_check(self, requested_path): + async def _handle_operation_check(self, requested_path: str) -> bool: if not self.get_relative_path("gcodes", requested_path): # Path not in the gcodes path return True # Get virtual_sdcard status - klippy_apis = self.server.lookup_component('klippy_apis') - result = await klippy_apis.query_objects({'print_stats': None}) + kapis: APIComp = self.server.lookup_component('klippy_apis') + result: Dict[str, Any] + result = await kapis.query_objects({'print_stats': None}) pstats = result.get('print_stats', {}) - loaded_file = pstats.get('filename', "") - state = pstats.get('state', "") + loaded_file: str = pstats.get('filename', "") + state: str = pstats.get('state', "") gc_path = self.file_paths.get('gcodes', "") full_path = os.path.join(gc_path, loaded_file) if loaded_file and state != "complete": @@ -252,7 +293,7 @@ class FileManager: ongoing = state in ["printing", "paused"] return ongoing - def _convert_request_path(self, request_path): + def _convert_request_path(self, request_path: str) -> Tuple[str, str]: # Parse the root, relative path, and disk path from a remote request parts = request_path.strip("/").split("/", 1) if not parts: @@ -265,9 +306,11 @@ class FileManager: disk_path = os.path.join(disk_path, parts[1]) return root, disk_path - async def _handle_file_move_copy(self, web_request): - source = web_request.get_str("source") - destination = web_request.get_str("dest") + async def _handle_file_move_copy(self, + web_request: WebRequest + ) -> Dict[str, Any]: + source: str = web_request.get_str("source") + destination: str = web_request.get_str("dest") ep = web_request.get_endpoint() source_root, source_path = self._convert_request_path(source) dest_root, dest_path = self._convert_request_path(destination) @@ -275,7 +318,7 @@ class FileManager: raise self.server.error( f"Destination path is read-only: {dest_root}") async with self.write_mutex: - result = {'item': {'root': dest_root}} + result: Dict[str, Any] = {'item': {'root': dest_root}} if not os.path.exists(source_path): raise self.server.error(f"File {source_path} does not exist") # make sure the destination is not in use @@ -287,7 +330,7 @@ class FileManager: f"Source path is read-only, cannot move: {source_root}") # if moving the file, make sure the source is not in use await self._handle_operation_check(source_path) - op_func = shutil.move + op_func: Callable[..., str] = shutil.move result['source_item'] = { 'path': source, 'root': source_root @@ -317,11 +360,14 @@ class FileManager: result['item']['path'] = self.get_relative_path(dest_root, full_dest) return result - def _list_directory(self, path, is_extended=False): + def _list_directory(self, + path: str, + is_extended: bool = False + ) -> Dict[str, Any]: if not os.path.isdir(path): raise self.server.error( f"Directory does not exist ({path})") - flist = {'dirs': [], 'files': []} + flist: Dict[str, Any] = {'dirs': [], 'files': []} for fname in os.listdir(path): full_path = os.path.join(path, fname) if not os.path.exists(full_path): @@ -338,26 +384,29 @@ class FileManager: if gc_path is not None and full_path.startswith(gc_path) and \ ext in VALID_GCODE_EXTS and is_extended: rel_path = os.path.relpath(full_path, start=gc_path) - metadata = self.gcode_metadata.get(rel_path, {}) + metadata: Dict[str, Any] = self.gcode_metadata.get( + rel_path, {}) path_info.update(metadata) flist['files'].append(path_info) usage = shutil.disk_usage(path) flist['disk_usage'] = usage._asdict() return flist - def get_path_info(self, path): + def get_path_info(self, path: str) -> Dict[str, Any]: modified = os.path.getmtime(path) size = os.path.getsize(path) path_info = {'modified': modified, 'size': size} return path_info - def gen_temp_upload_path(self): + def gen_temp_upload_path(self) -> str: ioloop = IOLoop.current() return os.path.join( tempfile.gettempdir(), f"moonraker.upload-{int(ioloop.time())}.mru") - async def finalize_upload(self, form_args): + async def finalize_upload(self, + form_args: Dict[str, Any] + ) -> Dict[str, Any]: # lookup root file path async with self.write_mutex: try: @@ -377,18 +426,20 @@ class FileManager: raise return result - def _parse_upload_args(self, upload_args): + def _parse_upload_args(self, + upload_args: Dict[str, Any] + ) -> Dict[str, Any]: if 'filename' not in upload_args: raise self.server.error( "No file name specifed in upload form") # check relative path - root = upload_args.get('root', "gcodes").lower() + root: str = upload_args.get('root', "gcodes").lower() if root not in self.file_paths: raise self.server.error(f"Root {root} not available") root_path = self.file_paths[root] - dir_path = upload_args.get('path', "") + dir_path: str = upload_args.get('path', "") if os.path.isfile(root_path): - filename = os.path.basename(root_path) + filename: str = os.path.basename(root_path) dest_path = root_path dir_path = "" else: @@ -400,7 +451,7 @@ class FileManager: if not dest_path.startswith(root_path): raise self.server.error( f"Cannot write to path: {dest_path}") - start_print = upload_args.get('print', "false") == "true" + start_print: bool = upload_args.get('print', "false") == "true" f_ext = os.path.splitext(dest_path)[-1].lower() unzip_ufp = f_ext == ".ufp" and root == "gcodes" if unzip_ufp: @@ -416,12 +467,14 @@ class FileManager: 'unzip_ufp': unzip_ufp } - async def _finish_gcode_upload(self, upload_info): - print_ongoing = False - start_print = upload_info['start_print'] + async def _finish_gcode_upload(self, + upload_info: Dict[str, Any] + ) -> Dict[str, Any]: + print_ongoing: bool = False + start_print: bool = upload_info['start_print'] # Verify that the operation can be done if attempting to upload a gcode try: - check_path = upload_info['dest_path'] + check_path: str = upload_info['dest_path'] print_ongoing = await self._handle_operation_check( check_path) except self.server.error as e: @@ -440,9 +493,9 @@ class FileManager: upload_info['filename'], finfo).wait() if start_print: # Make a Klippy Request to "Start Print" - klippy_apis = self.server.lookup_component('klippy_apis') + kapis: APIComp = self.server.lookup_component('klippy_apis') try: - await klippy_apis.start_print(upload_info['filename']) + await kapis.start_print(upload_info['filename']) except self.server.error: # Attempt to start print failed start_print = False @@ -457,7 +510,9 @@ class FileManager: 'action': "create_file" } - async def _finish_standard_upload(self, upload_info): + async def _finish_standard_upload(self, + upload_info: Dict[str, Any] + ) -> Dict[str, Any]: self._process_uploaded_file(upload_info) return { 'item': { @@ -467,7 +522,9 @@ class FileManager: 'action': "create_file" } - def _process_uploaded_file(self, upload_info): + def _process_uploaded_file(self, + upload_info: Dict[str, Any] + ) -> Dict[str, Any]: try: if upload_info['dir_path']: os.makedirs(os.path.dirname( @@ -485,9 +542,12 @@ class FileManager: raise self.server.error("Unable to save file", 500) return finfo - def get_file_list(self, root, list_format=False): + def get_file_list(self, + root: str, + list_format: bool = False + ) -> Union[Dict[str, Any], List[Dict[str, Any]]]: # Use os.walk find files in sd path and subdirs - filelist = {} + filelist: Dict[str, Any] = {} path = self.file_paths.get(root, None) if path is None or not os.path.isdir(path): msg = f"Failed to build file list, invalid path: {root}: {path}" @@ -497,7 +557,7 @@ class FileManager: st = os.stat(path) visited_dirs = {(st.st_dev, st.st_ino)} for dir_path, dir_names, files in os.walk(path, followlinks=True): - scan_dirs = [] + scan_dirs: List[str] = [] # Filter out directories that have already been visted. This # prevents infinite recrusion "followlinks" is set to True for dname in dir_names: @@ -521,15 +581,15 @@ class FileManager: finfo = self.get_path_info(full_path) filelist[fname] = finfo if list_format: - flist = [] + flist: List[Dict[str, Any]] = [] for fname in sorted(filelist, key=str.lower): - fdict = {'path': fname} + fdict: Dict[str, Any] = {'path': fname} fdict.update(filelist[fname]) flist.append(fdict) return flist return filelist - def get_file_metadata(self, filename): + def get_file_metadata(self, filename: str) -> Dict[str, Any]: if filename[0] == '/': filename = filename[1:] @@ -540,7 +600,10 @@ class FileManager: return self.gcode_metadata.get(filename, {}) - def list_dir(self, directory, simple_format=False): + def list_dir(self, + directory: str, + simple_format: bool = False + ) -> Union[Dict[str, Any], List[str]]: # List a directory relative to its root. if directory[0] == "/": directory = directory[1:] @@ -570,11 +633,13 @@ class FileManager: return simple_list return flist - async def _handle_file_delete(self, web_request): - file_path = web_request.get_str("path") + async def _handle_file_delete(self, + web_request: WebRequest + ) -> Dict[str, Any]: + file_path: str = web_request.get_str("path") return await self.delete_file(file_path) - async def delete_file(self, path): + async def delete_file(self, path: str) -> Dict[str, Any]: async with self.write_mutex: parts = path.lstrip("/").split("/", 1) if len(parts) != 2: @@ -599,7 +664,7 @@ class FileManager: 'item': {'path': filename, 'root': root}, 'action': "delete_file"} - def close(self): + def close(self) -> None: self.inotify_handler.close() @@ -607,17 +672,21 @@ INOTIFY_BUNDLE_TIME = .25 INOTIFY_MOVE_TIME = 1. class InotifyNode: - def __init__(self, ihdlr, parent, name): + def __init__(self, + ihdlr: INotifyHandler, + parent: InotifyNode, + name: str + ) -> None: self.ihdlr = ihdlr self.name = name self.parent_node = parent - self.child_nodes = {} + self.child_nodes: Dict[str, InotifyNode] = {} self.watch_desc = self.ihdlr.add_watch(self) - self.pending_node_events = {} - self.pending_deleted_children = set() - self.pending_file_events = {} + self.pending_node_events: Dict[str, object] = {} + self.pending_deleted_children: Set[Tuple[str, bool]] = set() + self.pending_file_events: Dict[str, str] = {} - async def _finish_create_node(self): + async def _finish_create_node(self) -> None: # Finish a node's creation. All children that were created # with this node (ie: a directory is copied) are bundled into # this notification. We also scan the node to extract metadata @@ -628,7 +697,7 @@ class InotifyNode: node_path = self.get_path() root = self.get_root() # Scan child nodes for unwatched directories and metadata - mevts = self.scan_node() + mevts: List[Event] = self.scan_node() if mevts: mfuts = [e.wait() for e in mevts] await asyncio.gather(*mfuts) @@ -636,7 +705,7 @@ class InotifyNode: self.ihdlr.notify_filelist_changed( "create_dir", root, node_path) - def _finish_delete_child(self): + def _finish_delete_child(self) -> None: # Items deleted in a child (node or file) are batched. # Individual files get notifications if their parent # node stil exists. Otherwise notififications are @@ -655,18 +724,19 @@ class InotifyNode: f"delete_{item_type}", root, item_path) self.pending_deleted_children.clear() - def scan_node(self, visited_dirs=set()): + def scan_node(self, + visited_dirs: Set[Tuple[int, int]] = set() + ) -> List[Event]: dir_path = self.get_path() st = os.stat(dir_path) if st in visited_dirs: return [] - metadata_events = [] + metadata_events: List[Event] = [] visited_dirs.add((st.st_dev, st.st_ino)) for fname in os.listdir(dir_path): if fname[0] == ".": continue item_path = os.path.join(dir_path, fname) - ext = os.path.splitext(fname)[-1].lower() if os.path.isdir(item_path): new_child = self.create_child_node(fname, False) metadata_events.extend(new_child.scan_node(visited_dirs)) @@ -675,7 +745,11 @@ class InotifyNode: metadata_events.append(mevt) return metadata_events - async def move_child_node(self, child_name, new_name, new_parent): + async def move_child_node(self, + child_name: str, + new_name: str, + new_parent: InotifyNode + ) -> None: child_node = self.pop_child_node(child_name) if child_node is None: logging.info(f"No child for node at path: {self.get_path()}") @@ -700,7 +774,7 @@ class InotifyNode: "move_dir", new_root, new_path, prev_root, prev_path) - def schedule_file_event(self, file_name, evt_name): + def schedule_file_event(self, file_name: str, evt_name: str) -> None: if file_name in self.pending_file_events: return pending_node = self.search_pending_event("create_node") @@ -708,7 +782,7 @@ class InotifyNode: pending_node.stop_event("create_node") self.pending_file_events[file_name] = evt_name - async def complete_file_write(self, file_name): + async def complete_file_write(self, file_name: str) -> None: evt_name = self.pending_file_events.pop(file_name, None) if evt_name is None: logging.info(f"Invalid file write event: {file_name}") @@ -729,17 +803,20 @@ class InotifyNode: await mevt.wait() self.ihdlr.notify_filelist_changed(evt_name, root, file_path) - def add_child_node(self, node): + def add_child_node(self, node: InotifyNode) -> None: self.child_nodes[node.name] = node node.parent_node = self - def get_child_node(self, name): + def get_child_node(self, name: str) -> Optional[InotifyNode]: return self.child_nodes.get(name, None) - def pop_child_node(self, name): + def pop_child_node(self, name: str) -> Optional[InotifyNode]: return self.child_nodes.pop(name, None) - def create_child_node(self, name, notify=True): + def create_child_node(self, + name: str, + notify: bool = True + ) -> InotifyNode: if name in self.child_nodes: return self.child_nodes[name] new_child = InotifyNode(self.ihdlr, self, name) @@ -753,7 +830,7 @@ class InotifyNode: pending_node.reset_event("create_node", INOTIFY_BUNDLE_TIME) return new_child - def schedule_child_delete(self, child_name, is_node): + def schedule_child_delete(self, child_name: str, is_node: bool) -> None: if is_node: child_node = self.child_nodes.pop(child_name, None) if child_node is None: @@ -764,19 +841,19 @@ class InotifyNode: self.pending_deleted_children.add((child_name, is_node)) self.add_event("delete_child", INOTIFY_BUNDLE_TIME) - def clear_watches(self): + def clear_watches(self) -> None: for cnode in self.child_nodes.values(): # Delete all of the children's children cnode.clear_watches() self.ihdlr.remove_watch(self.watch_desc) - def get_path(self): + def get_path(self) -> str: return os.path.join(self.parent_node.get_path(), self.name) - def get_root(self): + def get_root(self) -> str: return self.parent_node.get_root() - def add_event(self, evt_name, timeout): + def add_event(self, evt_name: str, timeout: float) -> None: if evt_name in self.pending_node_events: self.reset_event(evt_name, timeout) return @@ -784,7 +861,7 @@ class InotifyNode: hdl = IOLoop.current().call_later(timeout, callback) self.pending_node_events[evt_name] = hdl - def reset_event(self, evt_name, timeout): + def reset_event(self, evt_name: str, timeout: float) -> None: if evt_name in self.pending_node_events: ioloop = IOLoop.current() hdl = self.pending_node_events[evt_name] @@ -793,17 +870,17 @@ class InotifyNode: hdl = ioloop.call_later(timeout, callback) self.pending_node_events[evt_name] = hdl - def stop_event(self, evt_name): + def stop_event(self, evt_name: str) -> None: if evt_name in self.pending_node_events: hdl = self.pending_node_events[evt_name] IOLoop.current().remove_timeout(hdl) - def remove_event(self, evt_name): + def remove_event(self, evt_name: str) -> None: hdl = self.pending_node_events.pop(evt_name, None) if hdl is not None: IOLoop.current().remove_timeout(hdl) - def clear_events(self, include_children=True): + def clear_events(self, include_children: bool = True) -> None: if include_children: for child in self.child_nodes.values(): child.clear_events(include_children) @@ -813,40 +890,47 @@ class InotifyNode: self.pending_deleted_children.clear() self.pending_file_events.clear() - def search_pending_event(self, name): + def search_pending_event(self, name: str) -> Optional[InotifyNode]: if name in self.pending_node_events: return self - if self.parent_node is None: - return None return self.parent_node.search_pending_event(name) class InotifyRootNode(InotifyNode): - def __init__(self, ihdlr, root_name, root_path): + def __init__(self, + ihdlr: INotifyHandler, + root_name: str, + root_path: str + ) -> None: self.root_name = root_name - super().__init__(ihdlr, None, root_path) + super().__init__(ihdlr, self, root_path) - def get_path(self): + def get_path(self) -> str: return self.name - def get_root(self): + def get_root(self) -> str: return self.root_name + def search_pending_event(self, name) -> Optional[InotifyNode]: + if name in self.pending_node_events: + return self + return None + class NotifySyncLock: - def __init__(self, dest_path): - self.wait_fut = None + def __init__(self, dest_path: str) -> None: + self.wait_fut: Optional[asyncio.Future] = None self.sync_condition = Condition() self.dest_path = dest_path - self.notified_paths = set() - self.finished = False + self.notified_paths: Set[str] = set() + self.finished: bool = False - def update_dest(self, dest_path): + def update_dest(self, dest_path: str) -> None: self.dest_path = dest_path - def check_need_sync(self, path): + def check_need_sync(self, path: str) -> bool: return self.dest_path in [path, os.path.dirname(path)] \ and not self.finished - async def wait(self, timeout=None): + async def wait(self, timeout: Optional[float] = None) -> None: if self.finished or self.wait_fut is not None: # Can only wait once return @@ -862,7 +946,7 @@ class NotifySyncLock: self.sync_condition.notify_all() self.finished = True - async def sync(self, path, timeout=None): + async def sync(self, path, timeout: Optional[float] = None) -> None: if not self.check_need_sync(path): return self.notified_paths.add(path) @@ -870,7 +954,7 @@ class NotifySyncLock: self.wait_fut.set_result(None) # Transfer control to waiter if timeout is not None: - timeout = IOLoop.time() + timeout + timeout = IOLoop().time() + timeout try: await self.sync_condition.wait(timeout) except Exception: @@ -880,7 +964,7 @@ class NotifySyncLock: # return prior to a notification await asyncio.sleep(.005) - def cancel(self): + def cancel(self) -> None: if self.finished: return if self.wait_fut is not None and not self.wait_fut.done(): @@ -889,7 +973,11 @@ class NotifySyncLock: self.finished = True class INotifyHandler: - def __init__(self, config, file_manager, gcode_metadata): + def __init__(self, + config: ConfigHelper, + file_manager: FileManager, + gcode_metadata: MetadataStorage + ) -> None: self.server = config.get_server() self.debug_enabled = config['server'].getboolean( 'enable_debug_logging', False) @@ -900,14 +988,14 @@ class INotifyHandler: self.inotify.fileno(), self._handle_inotify_read, IOLoop.READ | IOLoop.ERROR) - self.event_loop_busy = False - self.pending_inotify_events = [] + self.event_loop_busy: bool = False + self.pending_inotify_events: List[InotifyEvent] = [] - self.watched_roots = {} - self.watched_nodes = {} - self.pending_moves = {} + self.watched_roots: Dict[str, InotifyRootNode] = {} + self.watched_nodes: Dict[int, InotifyNode] = {} + self.pending_moves: Dict[int, Tuple[InotifyNode, str, object]] = {} - def add_root_watch(self, root, root_path): + def add_root_watch(self, root: str, root_path: str) -> None: if root not in FULL_ACCESS_ROOTS: return # remove all exisiting watches on root @@ -922,31 +1010,42 @@ class INotifyHandler: IOLoop.current().spawn_callback( self._notify_root_updated, mevts, root, root_path) - async def _notify_root_updated(self, mevts, root, root_path): + async def _notify_root_updated(self, + mevts: List[Event], + root: str, + root_path: str + ) -> None: if mevts: mfuts = [e.wait() for e in mevts] await asyncio.gather(*mfuts) self.notify_filelist_changed("root_update", root, root_path) - def add_watch(self, node): + def add_watch(self, node: InotifyNode) -> int: dir_path = node.get_path() try: - watch = self.inotify.add_watch(dir_path, WATCH_FLAGS) + watch: int = self.inotify.add_watch(dir_path, WATCH_FLAGS) except OSError: logging.exception( f"Error adding watch, already exists: {dir_path}") self.watched_nodes[watch] = node return watch - def remove_watch(self, wdesc, need_low_level_rm=True): + def remove_watch(self, + wdesc: int, + need_low_level_rm: bool = True + ) -> None: node = self.watched_nodes.pop(wdesc, None) - if need_low_level_rm: + if need_low_level_rm and node is not None: try: self.inotify.rm_watch(wdesc) except Exception: logging.exception(f"Error removing watch: '{node.get_path()}'") - def clear_metadata(self, root, path, is_dir=False): + def clear_metadata(self, + root: str, + path: str, + is_dir: bool = False + ) -> None: if root == "gcodes": rel_path = self.file_manager.get_relative_path(root, path) if is_dir: @@ -954,8 +1053,13 @@ class INotifyHandler: else: self.gcode_metadata.remove_file_metadata(rel_path) - async def try_move_metadata(self, prev_root, new_root, prev_path, - new_path, is_dir=False): + async def try_move_metadata(self, + prev_root: str, + new_root: str, + prev_path: str, + new_path: str, + is_dir: bool = False + ) -> bool: if new_root == "gcodes": if prev_root == "gcodes": # moved within the gcodes root, move metadata @@ -970,7 +1074,7 @@ class INotifyHandler: self.gcode_metadata.move_file_metadata( prev_rel_path, new_rel_path) else: - # move from a non-gcodes root to gcodes return true + # move from a non-gcodes root to gcodes root needs a rescan self.clear_metadata(prev_root, prev_path, is_dir) return False elif prev_root == "gcodes": @@ -978,7 +1082,7 @@ class INotifyHandler: self.clear_metadata(prev_root, prev_path, is_dir) return True - def log_nodes(self): + def log_nodes(self) -> None: if self.debug_enabled: debug_msg = f"Inotify Watches After Scan:" for wdesc, node in self.watched_nodes.items(): @@ -988,7 +1092,7 @@ class INotifyHandler: f"Watch: {wdesc}" logging.debug(debug_msg) - def parse_gcode_metadata(self, file_path): + def parse_gcode_metadata(self, file_path: str) -> Event: rel_path = self.file_manager.get_relative_path("gcodes", file_path) path_info = self.file_manager.get_path_info(file_path) ext = os.path.splitext(file_path)[-1].lower() @@ -997,7 +1101,7 @@ class INotifyHandler: path_info['ufp_path'] = file_path return self.gcode_metadata.parse_metadata(rel_path, path_info) - def _handle_move_timeout(self, cookie, is_dir): + def _handle_move_timeout(self, cookie: int, is_dir: bool): if cookie not in self.pending_moves: return parent_node, name, hdl = self.pending_moves.pop(cookie) @@ -1016,16 +1120,21 @@ class INotifyHandler: action = "delete_dir" self.notify_filelist_changed(action, root, item_path) - def _schedule_pending_move(self, evt, parent_node, is_dir): + def _schedule_pending_move(self, + evt: InotifyEvent, + parent_node: InotifyNode, + is_dir: bool + ) -> None: hdl = IOLoop.current().call_later( INOTIFY_MOVE_TIME, self._handle_move_timeout, evt.cookie, is_dir) self.pending_moves[evt.cookie] = (parent_node, evt.name, hdl) - def _handle_inotify_read(self, fd, events): + def _handle_inotify_read(self, fd: int, events: int) -> None: if events & IOLoop.ERROR: logging.info("INotify Read Error") return + evt: InotifyEvent for evt in self.inotify.read(timeout=0): if evt.mask & iFlags.IGNORED: continue @@ -1041,7 +1150,7 @@ class INotifyHandler: self.event_loop_busy = True IOLoop.current().spawn_callback(self._process_inotify_events) - async def _process_inotify_events(self): + async def _process_inotify_events(self) -> None: while self.pending_inotify_events: evt = self.pending_inotify_events.pop(0) node = self.watched_nodes[evt.wd] @@ -1051,7 +1160,10 @@ class INotifyHandler: await self._process_file_event(evt, node) self.event_loop_busy = False - async def _process_dir_event(self, evt, node): + async def _process_dir_event(self, + evt: InotifyEvent, + node: InotifyNode + ) -> None: if evt.name and evt.name[0] == ".": # ignore changes to the hidden directories return @@ -1084,8 +1196,11 @@ class INotifyHandler: # directory node.create_child_node(evt.name) - async def _process_file_event(self, evt, node): - ext = os.path.splitext(evt.name)[-1].lower() + async def _process_file_event(self, + evt: InotifyEvent, + node: InotifyNode + ) -> None: + ext: str = os.path.splitext(evt.name)[-1].lower() root = node.get_root() node_path = node.get_path() if root == "gcodes" and ext not in VALID_GCODE_EXTS: @@ -1140,10 +1255,15 @@ class INotifyHandler: # Only process files that have been created or modified await node.complete_file_write(evt.name) - def notify_filelist_changed(self, action, root, full_path, - source_root=None, source_path=None): + def notify_filelist_changed(self, + action: str, + root: str, + full_path: str, + source_root: Optional[str] = None, + source_path: Optional[str] = None + ) -> None: rel_path = self.file_manager.get_relative_path(root, full_path) - file_info = {'size': 0, 'modified': 0} + file_info: Dict[str, Any] = {'size': 0, 'modified': 0} if os.path.exists(full_path): file_info = self.file_manager.get_path_info(full_path) file_info['path'] = rel_path @@ -1162,11 +1282,14 @@ class INotifyHandler: else: self.server.send_event("file_manager:filelist_changed", result) - async def _delay_notification(self, result, sync_fut): + async def _delay_notification(self, + result: Dict[str, Any], + sync_fut: Coroutine + ) -> None: await sync_fut self.server.send_event("file_manager:filelist_changed", result) - def close(self): + def close(self) -> None: IOLoop.current().remove_handler(self.inotify.fileno()) for watch in self.watched_nodes.keys(): try: @@ -1179,24 +1302,27 @@ METADATA_NAMESPACE = "gcode_metadata" METADATA_VERSION = 3 class MetadataStorage: - def __init__(self, server, gc_path, database): + def __init__(self, + server: Server, + gc_path: str, + db: DBComp + ) -> None: self.server = server self.gc_path = gc_path - database.register_local_namespace(METADATA_NAMESPACE) - self.mddb = database.wrap_namespace( + db.register_local_namespace(METADATA_NAMESPACE) + self.mddb = db.wrap_namespace( METADATA_NAMESPACE, parse_keys=False) - version = database.get_item( + version = db.get_item( "moonraker", "file_manager.metadata_version", 0) if version != METADATA_VERSION: # Clear existing metadata when version is bumped for fname in self.mddb.keys(): self.remove_file_metadata(fname) - database.insert_item( + db.insert_item( "moonraker", "file_manager.metadata_version", METADATA_VERSION) - self.pending_requests = {} - self.events = {} - self.busy = False + self.pending_requests: Dict[str, Tuple[Dict[str, Any], Event]] = {} + self.busy: bool = False if self.gc_path: # Check for removed gcode files while moonraker was shutdown for fname in list(self.mddb.keys()): @@ -1206,44 +1332,53 @@ class MetadataStorage: logging.info(f"Pruned file: {fname}") continue - def update_gcode_path(self, path): + def update_gcode_path(self, path: str) -> None: if path == self.gc_path: return self.mddb.clear() self.gc_path = path - def get(self, key, default=None): + def get(self, + key: str, + default: _T = None + ) -> Union[_T, Dict[str, Any]]: return self.mddb.get(key, default) - def __getitem__(self, key): + def __getitem__(self, key: str) -> Dict[str, Any]: return self.mddb[key] - def _has_valid_data(self, fname, path_info): + def _has_valid_data(self, + fname: str, + path_info: Dict[str, Any] + ) -> bool: if path_info.get('ufp_path', None) is not None: # UFP files always need processing return False + mdata: Dict[str, Any] mdata = self.mddb.get(fname, {'size': "", 'modified': 0}) for field in ['size', 'modified']: if mdata[field] != path_info.get(field, None): return False return True - def remove_directory_metadata(self, dir_name): + def remove_directory_metadata(self, dir_name: str) -> None: if dir_name[-1] != "/": dir_name += "/" for fname in list(self.mddb.keys()): if fname.startswith(dir_name): self.remove_file_metadata(fname) - def remove_file_metadata(self, fname): + def remove_file_metadata(self, fname: str) -> None: + metadata: Optional[Dict[str, Any]] metadata = self.mddb.pop(fname, None) if metadata is None: return # Delete associated thumbnails fdir = os.path.dirname(os.path.join(self.gc_path, fname)) if "thumbnails" in metadata: + thumb: Dict[str, Any] for thumb in metadata["thumbnails"]: - path = thumb.get("relative_path", None) + path: Optional[str] = thumb.get("relative_path", None) if path is None: continue thumb_path = os.path.join(fdir, path) @@ -1254,7 +1389,7 @@ class MetadataStorage: except Exception: logging.debug(f"Error removing thumb at {thumb_path}") - def move_directory_metadata(self, prev_dir, new_dir): + def move_directory_metadata(self, prev_dir: str, new_dir: str) -> None: if prev_dir[-1] != "/": prev_dir += "/" for prev_fname in list(self.mddb.keys()): @@ -1262,7 +1397,12 @@ class MetadataStorage: new_fname = os.path.join(new_dir, prev_fname[len(prev_dir):]) self.move_file_metadata(prev_fname, new_fname, False) - def move_file_metadata(self, prev_fname, new_fname, move_thumbs=True): + def move_file_metadata(self, + prev_fname: str, + new_fname: str, + move_thumbs: bool = True + ) -> None: + metadata: Optional[Dict[str, Any]] metadata = self.mddb.pop(prev_fname, None) if metadata is None: return @@ -1270,8 +1410,9 @@ class MetadataStorage: prev_dir = os.path.dirname(os.path.join(self.gc_path, prev_fname)) new_dir = os.path.dirname(os.path.join(self.gc_path, new_fname)) if "thumbnails" in metadata and move_thumbs: + thumb: Dict[str, Any] for thumb in metadata["thumbnails"]: - path = thumb.get("relative_path", None) + path: Optional[str] = thumb.get("relative_path", None) if path is None: continue thumb_path = os.path.join(prev_dir, path) @@ -1285,7 +1426,10 @@ class MetadataStorage: logging.debug(f"Error moving thumb from {thumb_path}" f" to {new_path}") - def parse_metadata(self, fname, path_info): + def parse_metadata(self, + fname: str, + path_info: Dict[str, Any] + ) -> Event: mevt = Event() ext = os.path.splitext(fname)[1] if fname in self.pending_requests or \ @@ -1301,14 +1445,14 @@ class MetadataStorage: IOLoop.current().spawn_callback(self._process_metadata_update) return mevt - async def _process_metadata_update(self): + async def _process_metadata_update(self) -> None: while self.pending_requests: fname, (path_info, mevt) = \ self.pending_requests.popitem() if self._has_valid_data(fname, path_info): mevt.set() continue - ufp_path = path_info.get('ufp_path', None) + ufp_path: Optional[str] = path_info.get('ufp_path', None) retries = 3 while retries: try: @@ -1331,7 +1475,10 @@ class MetadataStorage: mevt.set() self.busy = False - async def _run_extract_metadata(self, filename, ufp_path): + async def _run_extract_metadata(self, + filename: str, + ufp_path: Optional[str] + ) -> None: # Escape single quotes in the file name so that it may be # properly loaded filename = filename.replace("\"", "\\\"") @@ -1342,16 +1489,16 @@ class MetadataStorage: timeout = 300. ufp_path.replace("\"", "\\\"") cmd += f" -u \"{ufp_path}\"" - shell_command = self.server.lookup_component('shell_command') - scmd = shell_command.build_shell_command(cmd, log_stderr=True) + shell_cmd: SCMDComp = self.server.lookup_component('shell_command') + scmd = shell_cmd.build_shell_command(cmd, log_stderr=True) result = await scmd.run_with_response(timeout=timeout) try: - decoded_resp = json.loads(result.strip()) + decoded_resp: Dict[str, Any] = json.loads(result.strip()) except Exception: logging.debug(f"Invalid metadata response:\n{result}") raise - path = decoded_resp['file'] - metadata = decoded_resp['metadata'] + path: str = decoded_resp['file'] + metadata: Dict[str, Any] = decoded_resp['metadata'] if not metadata: # This indicates an error, do not add metadata for this raise self.server.error("Unable to extract metadata") @@ -1359,5 +1506,5 @@ class MetadataStorage: self.mddb[path] = dict(metadata) metadata['filename'] = path -def load_component(config): +def load_component(config: ConfigHelper) -> FileManager: return FileManager(config)