diff --git a/moonraker/components/history.py b/moonraker/components/history.py index d3488be..97477ce 100644 --- a/moonraker/components/history.py +++ b/moonraker/components/history.py @@ -1,19 +1,41 @@ # History cache for printer jobs # # This file may be distributed under the terms of the GNU GPLv3 license. + +from __future__ import annotations import logging import time +# Annotation imports +from typing import ( + TYPE_CHECKING, + Any, + Union, + Optional, + Dict, + List, +) +if TYPE_CHECKING: + from confighelper import ConfigHelper + from websockets import WebRequest + from . import database + from . import klippy_apis + from . import file_manager + DBComp = database.MoonrakerDatabase + APIComp = klippy_apis.KlippyAPI + FMComp = file_manager.FileManager + HIST_NAMESPACE = "history" MAX_JOBS = 10000 class History: - def __init__(self, config): + def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() - self.file_manager = self.server.lookup_component('file_manager') - database = self.server.lookup_component("database") + self.file_manager: FMComp = self.server.lookup_component( + 'file_manager') + database: DBComp = self.server.lookup_component("database") self.gcdb = database.wrap_namespace("gcode_metadata", parse_keys=False) - self.job_totals = database.get_item( + self.job_totals: Dict[str, float] = database.get_item( "moonraker", "history.job_totals", { 'total_jobs': 0, @@ -45,24 +67,26 @@ class History: self.history_ns = database.wrap_namespace(HIST_NAMESPACE, parse_keys=False) - self.current_job = None - self.current_job_id = None - self.print_stats = {} - self.next_job_id = 0 + self.current_job: Optional[PrinterJob] = None + self.current_job_id: Optional[str] = None + self.print_stats: Dict[str, Any] = {} + self.next_job_id: int = 0 self.cached_job_ids = self.history_ns.keys() if self.cached_job_ids: self.next_job_id = int(self.cached_job_ids[-1], 16) + 1 - async def _init_ready(self): - klippy_apis = self.server.lookup_component('klippy_apis') - sub = {"print_stats": None} + async def _init_ready(self) -> None: + klippy_apis: APIComp = self.server.lookup_component('klippy_apis') + sub: Dict[str, Optional[List[str]]] = {"print_stats": None} try: result = await klippy_apis.subscribe_objects(sub) except self.server.error as e: logging.info(f"Error subscribing to print_stats") self.print_stats = result.get("print_stats", {}) - async def _handle_job_request(self, web_request): + async def _handle_job_request(self, + web_request: WebRequest + ) -> Dict[str, Any]: action = web_request.get_action() if action == "GET": job_id = web_request.get_str("uid") @@ -85,12 +109,15 @@ class History: self.delete_job(job_id) return {'deleted_jobs': [job_id]} + raise self.server.error("Invalid Request Method") - async def _handle_jobs_list(self, web_request): + async def _handle_jobs_list(self, + web_request: WebRequest + ) -> Dict[str, Any]: i = 0 count = 0 end_num = len(self.cached_job_ids) - jobs = [] + jobs: List[Dict[str, Any]] = [] start_num = 0 before = web_request.get_float("before", -1) @@ -108,8 +135,8 @@ class History: if since != -1: while start_num < end_num: job_id = self.cached_job_ids[start_num] - job = self.history_ns[job_id] - if job.get('start_time') > since: + job: Dict[str, Any] = self.history_ns[job_id] + if job['start_time'] > since: break start_num += 1 @@ -117,7 +144,7 @@ class History: while end_num > 0: job_id = self.cached_job_ids[end_num-1] job = self.history_ns[job_id] - if job.get('end_time') < before: + if job['end_time'] < before: break end_num -= 1 @@ -141,14 +168,16 @@ class History: return {"count": count, "jobs": jobs} - async def _handle_job_totals(self, web_request): + async def _handle_job_totals(self, + web_request: WebRequest + ) -> Dict[str, Dict[str, float]]: return {'job_totals': self.job_totals} - async def _status_update(self, data): + async def _status_update(self, data: Dict[str, Any]) -> None: ps = data.get("print_stats", {}) if "state" in ps: - old_state = self.print_stats['state'] - new_state = ps['state'] + old_state: str = self.print_stats['state'] + new_state: str = ps['state'] new_ps = dict(self.print_stats) new_ps.update(ps) @@ -171,13 +200,13 @@ class History: self.print_stats.update(ps) - def _handle_shutdown(self): + def _handle_shutdown(self) -> None: self.finish_job("klippy_shutdown", self.print_stats) - def _handle_disconnect(self): + def _handle_disconnect(self) -> None: self.finish_job("klippy_disconnect", self.print_stats) - def _check_need_cancel(self, new_stats): + def _check_need_cancel(self, new_stats: Dict[str, Any]) -> bool: # Cancel if the file name has changed, total duration has # decreased, or if job is not resuming from a pause ps = self.print_stats @@ -185,7 +214,7 @@ class History: ps['total_duration'] > new_stats['total_duration'] or \ ps['state'] != "paused" - def add_job(self, job): + def add_job(self, job: PrinterJob) -> None: if len(self.cached_job_ids) >= MAX_JOBS: self.delete_job(self.cached_job_ids[0]) job_id = f"{self.next_job_id:06X}" @@ -197,7 +226,7 @@ class History: self.next_job_id += 1 self.send_history_event("added") - def delete_job(self, job_id): + def delete_job(self, job_id: Union[int, str]) -> None: if isinstance(job_id, int): job_id = f"{job_id:06X}" @@ -205,7 +234,7 @@ class History: del self.history_ns[job_id] self.cached_job_ids.remove(job_id) - def finish_job(self, status, pstats): + def finish_job(self, status: str, pstats: Dict[str, Any]) -> None: if self.current_job is None: return @@ -218,16 +247,16 @@ class History: self.current_job = None self.current_job_id = None - def get_job(self, job_id): + def get_job(self, job_id: Union[int, str]) -> Optional[Dict[str, Any]]: if isinstance(job_id, int): job_id = f"{job_id:06X}" return self.history_ns.get(job_id, None) - def grab_job_metadata(self): + def grab_job_metadata(self) -> None: if self.current_job is None: return - filename = self.current_job.get("filename") - metadata = self.gcdb.get(filename, {}) + filename: str = self.current_job.get("filename") + metadata: Dict[str, Any] = self.gcdb.get(filename, {}) if metadata: # Add the start time and job id to the # persistent metadata storage @@ -241,14 +270,17 @@ class History: metadata.pop('print_start_time', None) metadata.pop('job_id', None) if "thumbnails" in metadata: + thumb: Dict[str, Any] for thumb in metadata['thumbnails']: thumb.pop('data', None) self.current_job.set("metadata", metadata) - def save_current_job(self): + def save_current_job(self) -> None: + if self.current_job is None or self.current_job_id is None: + return self.history_ns[self.current_job_id] = self.current_job.get_stats() - def _update_job_totals(self): + def _update_job_totals(self) -> None: if self.current_job is None: return job = self.current_job @@ -260,59 +292,67 @@ class History: self.job_totals['longest_job'], job.get('total_duration')) self.job_totals['longest_print'] = max( self.job_totals['longest_print'], job.get('print_duration')) - database = self.server.lookup_component("database") + database: DBComp = self.server.lookup_component("database") database.insert_item( "moonraker", "history.job_totals", self.job_totals) - def send_history_event(self, evt_action): + def send_history_event(self, evt_action: str) -> None: + if self.current_job is None or self.current_job_id is None: + return job = self._prep_requested_job( self.current_job.get_stats(), self.current_job_id) self.server.send_event("history:history_changed", {'action': evt_action, 'job': job}) - def _prep_requested_job(self, job, job_id): + def _prep_requested_job(self, + job: Dict[str, Any], + job_id: str + ) -> Dict[str, Any]: job['job_id'] = job_id job['exists'] = self.file_manager.check_file_exists( "gcodes", job['filename']) return job - def on_exit(self): + def on_exit(self) -> None: self.finish_job("server_exit", self.print_stats) class PrinterJob: - def __init__(self, data={}): - self.end_time = None - self.filament_used = 0 - self.filename = None - self.metadata = None - self.print_duration = 0 - self.status = "in_progress" + def __init__(self, data: Dict[str, Any] = {}) -> None: + self.end_time: Optional[float] = None + self.filament_used: float = 0 + self.filename: Optional[str] = None + self.metadata: Optional[Dict[str, Any]] = None + self.print_duration: float = 0. + self.status: str = "in_progress" self.start_time = time.time() - self.total_duration = 0 + self.total_duration: float = 0. self.update_from_ps(data) - def finish(self, status, print_stats={}): + def finish(self, + status: str, + print_stats: Dict[str, Any] = {} + ) -> None: self.end_time = time.time() self.status = status self.update_from_ps(print_stats) - def get(self, name): + def get(self, name: str) -> Any: if not hasattr(self, name): return None return getattr(self, name) - def get_stats(self): + def get_stats(self) -> Dict[str, Any]: return self.__dict__.copy() - def set(self, name, val): + def set(self, name: str, val: Any) -> None: if not hasattr(self, name): return setattr(self, name, val) - def update_from_ps(self, data): + def update_from_ps(self, data: Dict[str, Any]) -> None: for i in data: if hasattr(self, i): setattr(self, i, data[i]) -def load_component(config): +def load_component(config: ConfigHelper) -> History: return History(config)