data_store: add support for temp monitors

Temperature Monitors may report null values as temperatures,
thus special handling is needed.  This commit also reworks
temperature store updates to use the subscription cache
rather than perform its own caching of "last temps".

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-08-02 12:41:36 -04:00
parent 797c62389d
commit 47f0f0c21e
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
1 changed files with 50 additions and 46 deletions

View File

@ -16,18 +16,23 @@ from typing import (
Optional,
Dict,
List,
Tuple,
Deque,
)
if TYPE_CHECKING:
from ..confighelper import ConfigHelper
from ..common import WebRequest
from ..klippy_connection import KlippyConnection
from .klippy_apis import KlippyAPI as APIComp
GCQueue = Deque[Dict[str, Any]]
TempStore = Dict[str, Dict[str, Deque[float]]]
TempStore = Dict[str, Dict[str, Deque[Optional[float]]]]
TEMP_UPDATE_TIME = 1.
def _round_null(val: Optional[float], ndigits: int) -> Optional[float]:
if val is None:
return val
return round(val, ndigits)
class DataStore:
def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server()
@ -35,9 +40,11 @@ class DataStore:
self.gcode_store_size = config.getint('gcode_store_size', 1000)
# Temperature Store Tracking
self.last_temps: Dict[str, Tuple[float, ...]] = {}
kconn: KlippyConnection = self.server.lookup_component("klippy_connection")
self.subscription_cache = kconn.get_subscription_cache()
self.gcode_queue: GCQueue = deque(maxlen=self.gcode_store_size)
self.temperature_store: TempStore = {}
self.temp_monitors: List[str] = []
eventloop = self.server.get_event_loop()
self.temp_update_timer = eventloop.register_timer(
self._update_temperature_store)
@ -67,75 +74,72 @@ class DataStore:
except self.server.error as e:
logging.info(f"Error Configuring Sensors: {e}")
return
sensors: List[str]
sensors = result.get("heaters", {}).get("available_sensors", [])
heaters: Dict[str, List[str]] = result.get("heaters", {})
sensors = heaters.get("available_sensors", [])
self.temp_monitors = heaters.get("available_monitors", [])
sensors.extend(self.temp_monitors)
if sensors:
# Add Subscription
sub: Dict[str, Optional[List[str]]] = {s: None for s in sensors}
try:
status: Dict[str, Any]
status = await klippy_apis.subscribe_objects(
sub, self._set_current_temps
)
status = await klippy_apis.subscribe_objects(sub)
except self.server.error as e:
logging.info(f"Error subscribing to sensors: {e}")
return
logging.info(f"Configuring available sensors: {sensors}")
new_store: TempStore = {}
valid_fields = ("temperature", "target", "power", "speed")
for sensor in sensors:
fields = list(status.get(sensor, {}).keys())
reported_fields = [
f for f in list(status.get(sensor, {}).keys()) if f in valid_fields
]
if not reported_fields:
logging.info(f"No valid fields reported for sensor: {sensor}")
self.temperature_store.pop(sensor, None)
continue
if sensor in self.temperature_store:
new_store[sensor] = self.temperature_store[sensor]
for field in list(new_store[sensor].keys()):
if field not in reported_fields:
new_store[sensor].pop(field, None)
else:
new_store[sensor] = {
'temperatures': deque(maxlen=self.temp_store_size)}
for item in ["target", "power", "speed"]:
if item in fields:
new_store[sensor][f"{item}s"] = deque(
maxlen=self.temp_store_size)
if sensor not in self.last_temps:
self.last_temps[sensor] = (0., 0., 0., 0.)
initial_val: Optional[float]
initial_val = _round_null(status[sensor][field], 2)
new_store[sensor][field].append(initial_val)
else:
new_store[sensor] = {}
for field in reported_fields:
if field not in new_store[sensor]:
initial_val = _round_null(status[sensor][field], 2)
new_store[sensor][field] = deque(
[initial_val], maxlen=self.temp_store_size
)
self.temperature_store = new_store
# Prune unconfigured sensors in self.last_temps
for sensor in list(self.last_temps.keys()):
if sensor not in self.temperature_store:
del self.last_temps[sensor]
# Update initial temperatures
self._set_current_temps(status)
self.temp_update_timer.start()
self.temp_update_timer.start(delay=1.)
else:
logging.info("No sensors found")
self.last_temps = {}
self.temperature_store = {}
self.temp_monitors = []
self.temp_update_timer.stop()
def _set_current_temps(self, data: Dict[str, Any], _: float = 0.) -> None:
for sensor in self.temperature_store:
if sensor in data:
last_val = self.last_temps[sensor]
self.last_temps[sensor] = (
round(data[sensor].get('temperature', last_val[0]), 2),
data[sensor].get('target', last_val[1]),
data[sensor].get('power', last_val[2]),
data[sensor].get('speed', last_val[3]))
def _update_temperature_store(self, eventtime: float) -> float:
# XXX - If klippy is not connected, set values to zero
# as they are unknown?
for sensor, vals in self.last_temps.items():
self.temperature_store[sensor]['temperatures'].append(vals[0])
for val, item in zip(vals[1:], ["targets", "powers", "speeds"]):
if item in self.temperature_store[sensor]:
self.temperature_store[sensor][item].append(val)
for sensor_name, sensor in self.temperature_store.items():
sdata: Dict[str, Any] = self.subscription_cache.get(sensor_name, {})
for field, store in sensor.items():
store.append(_round_null(sdata.get(field, store[-1]), 2))
return eventtime + TEMP_UPDATE_TIME
async def _handle_temp_store_request(self,
web_request: WebRequest
) -> Dict[str, Dict[str, List[float]]]:
async def _handle_temp_store_request(
self, web_request: WebRequest
) -> Dict[str, Dict[str, List[Optional[float]]]]:
include_monitors = web_request.get_boolean("include_monitors", False)
store = {}
for name, sensor in self.temperature_store.items():
store[name] = {k: list(v) for k, v in sensor.items()}
if not include_monitors and name in self.temp_monitors:
continue
store[name] = {f"{k}s": list(v) for k, v in sensor.items()}
return store
async def close(self) -> None: