From a802e10725918ff22f4b4fae50a6e4664d6dc881 Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Sun, 28 Apr 2024 06:03:12 -0400 Subject: [PATCH] mqtt: support configurable status update interval SIgned-off-by: Eric Callahan --- moonraker/components/mqtt.py | 47 +++++++++++++++++++++++++++++++----- 1 file changed, 41 insertions(+), 6 deletions(-) diff --git a/moonraker/components/mqtt.py b/moonraker/components/mqtt.py index f998c14..3055f9c 100644 --- a/moonraker/components/mqtt.py +++ b/moonraker/components/mqtt.py @@ -38,6 +38,7 @@ from typing import ( if TYPE_CHECKING: from ..confighelper import ConfigHelper from ..common import JsonRPC, APIDefinition + from ..eventloop import FlexTimer from .klippy_apis import KlippyAPI FlexCallback = Callable[[bytes], Optional[Coroutine]] RPCCallback = Callable[..., Coroutine] @@ -323,6 +324,10 @@ class MQTTClient(APITransport): status_cfg: Dict[str, str] = config.getdict( "status_objects", {}, allow_empty_fields=True ) + self.status_interval = config.getfloat("status_interval", 0, above=.25) + self.status_cache: Dict[str, Dict[str, Any]] = {} + self.status_update_timer: Optional[FlexTimer] = None + self.last_status_time = 0. self.status_objs: Dict[str, Optional[List[str]]] = {} for key, val in status_cfg.items(): if val is not None: @@ -334,6 +339,13 @@ class MQTTClient(APITransport): self.server.register_event_handler( "server:klippy_started", self._handle_klippy_started ) + self.server.register_event_handler( + "server:klippy_disconnect", self._handle_klippy_disconnect + ) + if self.status_interval: + self.status_update_timer = self.eventloop.register_timer( + self._handle_timed_status_update + ) self.timestamp_deque: Deque = deque(maxlen=20) self.api_qos = config.getint('api_qos', self.qos) @@ -371,6 +383,16 @@ class MQTTClient(APITransport): await kapi.subscribe_from_transport( self.status_objs, self, default=None, ) + if self.status_update_timer is not None: + self.status_update_timer.start(delay=self.status_interval) + + def _handle_klippy_disconnect(self): + if self.status_update_timer is not None: + self.status_update_timer.stop() + if self.status_cache: + payload = self.status_cache + self.status_cache = {} + self._publish_status_update(payload, self.last_status_time) def _on_message(self, client: str, @@ -694,18 +716,29 @@ class MQTTClient(APITransport): else: self.timestamp_deque.append(ts) - def send_status(self, - status: Dict[str, Any], - eventtime: float - ) -> None: + def send_status(self, status: Dict[str, Any], eventtime: float) -> None: if not status or not self.is_connected(): return + if not self.status_interval: + self._publish_status_update(status, eventtime) + else: + for key, val in status.items(): + self.status_cache.setdefault(key, {}).update(val) + self.last_status_time = eventtime + + def _handle_timed_status_update(self, eventtime: float) -> float: + if self.status_cache: + payload = self.status_cache + self.status_cache = {} + self._publish_status_update(payload, self.last_status_time) + return eventtime + self.status_interval + + def _publish_status_update(self, status: Dict[str, Any], eventtime: float) -> None: if self.publish_split_status: for objkey in status: objval = status[objkey] for statekey in objval: - payload = {'eventtime': eventtime, - 'value': objval[statekey]} + payload = {'eventtime': eventtime, 'value': objval[statekey]} self.publish_topic( f"{self.klipper_state_prefix}/{objkey}/{statekey}", payload, retain=True) @@ -718,6 +751,8 @@ class MQTTClient(APITransport): return self.instance_name async def close(self) -> None: + if self.status_update_timer is not None: + self.status_update_timer.stop() if self.connect_task is not None: self.connect_task.cancel() self.connect_task = None