From f5010e960bb5afa94de145fc0777bf60efc65b68 Mon Sep 17 00:00:00 2001 From: Arksine Date: Fri, 14 May 2021 08:42:38 -0400 Subject: [PATCH] data_store: add annotations Signed-off-by: Eric Callahan --- moonraker/components/data_store.py | 61 +++++++++++++++++++++--------- 1 file changed, 44 insertions(+), 17 deletions(-) diff --git a/moonraker/components/data_store.py b/moonraker/components/data_store.py index d849746..8799cbe 100644 --- a/moonraker/components/data_store.py +++ b/moonraker/components/data_store.py @@ -3,23 +3,43 @@ # Copyright (C) 2020 Eric Callahan # # This file may be distributed under the terms of the GNU GPLv3 license. + +from __future__ import annotations import logging import time from collections import deque -from tornado.ioloop import IOLoop, PeriodicCallback +from tornado.ioloop import PeriodicCallback + +# Annotation imports +from typing import ( + TYPE_CHECKING, + Any, + Optional, + Dict, + List, + Tuple, + Deque, +) +if TYPE_CHECKING: + from confighelper import ConfigHelper + from websockets import WebRequest + from . import klippy_apis + APIComp = klippy_apis.KlippyAPI + GCQueue = Deque[Dict[str, Any]] + TempStore = Dict[str, Dict[str, Deque[float]]] TEMPERATURE_UPDATE_MS = 1000 class DataStore: - def __init__(self, config): + def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() self.temp_store_size = config.getint('temperature_store_size', 1200) self.gcode_store_size = config.getint('gcode_store_size', 1000) # Temperature Store Tracking - self.last_temps = {} - self.gcode_queue = deque(maxlen=self.gcode_store_size) - self.temperature_store = {} + self.last_temps: Dict[str, Tuple[float, ...]] = {} + self.gcode_queue: GCQueue = deque(maxlen=self.gcode_store_size) + self.temperature_store: TempStore = {} self.temp_update_cb = PeriodicCallback( self._update_temperature_store, TEMPERATURE_UPDATE_MS) @@ -39,26 +59,29 @@ class DataStore: "/server/gcode_store", ['GET'], self._handle_gcode_store_request) - async def _init_sensors(self): - klippy_apis = self.server.lookup_component('klippy_apis') + async def _init_sensors(self) -> None: + klippy_apis: APIComp = self.server.lookup_component('klippy_apis') # Fetch sensors try: + result: Dict[str, Any] result = await klippy_apis.query_objects({'heaters': None}) except self.server.error as e: logging.info(f"Error Configuring Sensors: {e}") return + sensors: List[str] sensors = result.get("heaters", {}).get("available_sensors", []) if sensors: # Add Subscription - sub = {s: None for s in sensors} + sub: Dict[str, Optional[List[str]]] = {s: None for s in sensors} try: + status: Dict[str, Any] status = await klippy_apis.subscribe_objects(sub) except self.server.error as e: logging.info(f"Error subscribing to sensors: {e}") return logging.info(f"Configuring available sensors: {sensors}") - new_store = {} + new_store: TempStore = {} for sensor in sensors: fields = list(status.get(sensor, {}).keys()) if sensor in self.temperature_store: @@ -86,7 +109,7 @@ class DataStore: self.temperature_store = {} self.temp_update_cb.stop() - def _set_current_temps(self, data): + def _set_current_temps(self, data: Dict[str, Any]) -> None: for sensor in self.temperature_store: if sensor in data: last_val = self.last_temps[sensor] @@ -96,7 +119,7 @@ class DataStore: data[sensor].get('power', last_val[2]), data[sensor].get('speed', last_val[3])) - def _update_temperature_store(self): + def _update_temperature_store(self) -> None: # XXX - If klippy is not connected, set values to zero # as they are unknown? for sensor, vals in self.last_temps.items(): @@ -105,21 +128,23 @@ class DataStore: if item in self.temperature_store[sensor]: self.temperature_store[sensor][item].append(val) - async def _handle_temp_store_request(self, web_request): + async def _handle_temp_store_request(self, + web_request: WebRequest + ) -> Dict[str, Dict[str, List[float]]]: store = {} for name, sensor in self.temperature_store.items(): store[name] = {k: list(v) for k, v in sensor.items()} return store - async def close(self): + async def close(self) -> None: self.temp_update_cb.stop() - def _update_gcode_store(self, response): + def _update_gcode_store(self, response: str) -> None: curtime = time.time() self.gcode_queue.append( {'message': response, 'time': curtime, 'type': "response"}) - def store_gcode_command(self, script): + def store_gcode_command(self, script: str) -> None: curtime = time.time() for cmd in script.split('\n'): cmd = cmd.strip() @@ -128,7 +153,9 @@ class DataStore: self.gcode_queue.append( {'message': script, 'time': curtime, 'type': "command"}) - async def _handle_gcode_store_request(self, web_request): + async def _handle_gcode_store_request(self, + web_request: WebRequest + ) -> Dict[str, List[Dict[str, Any]]]: count = web_request.get_int("count", None) if count is not None: gc_responses = list(self.gcode_queue)[-count:] @@ -136,5 +163,5 @@ class DataStore: gc_responses = list(self.gcode_queue) return {'gcode_store': gc_responses} -def load_component(config): +def load_component(config: ConfigHelper) -> DataStore: return DataStore(config)