mqtt: support configurable status update interval

SIgned-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2024-04-28 06:03:12 -04:00
parent 546a17f5d3
commit a802e10725
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
1 changed files with 41 additions and 6 deletions

View File

@ -38,6 +38,7 @@ from typing import (
if TYPE_CHECKING:
from ..confighelper import ConfigHelper
from ..common import JsonRPC, APIDefinition
from ..eventloop import FlexTimer
from .klippy_apis import KlippyAPI
FlexCallback = Callable[[bytes], Optional[Coroutine]]
RPCCallback = Callable[..., Coroutine]
@ -323,6 +324,10 @@ class MQTTClient(APITransport):
status_cfg: Dict[str, str] = config.getdict(
"status_objects", {}, allow_empty_fields=True
)
self.status_interval = config.getfloat("status_interval", 0, above=.25)
self.status_cache: Dict[str, Dict[str, Any]] = {}
self.status_update_timer: Optional[FlexTimer] = None
self.last_status_time = 0.
self.status_objs: Dict[str, Optional[List[str]]] = {}
for key, val in status_cfg.items():
if val is not None:
@ -334,6 +339,13 @@ class MQTTClient(APITransport):
self.server.register_event_handler(
"server:klippy_started", self._handle_klippy_started
)
self.server.register_event_handler(
"server:klippy_disconnect", self._handle_klippy_disconnect
)
if self.status_interval:
self.status_update_timer = self.eventloop.register_timer(
self._handle_timed_status_update
)
self.timestamp_deque: Deque = deque(maxlen=20)
self.api_qos = config.getint('api_qos', self.qos)
@ -371,6 +383,16 @@ class MQTTClient(APITransport):
await kapi.subscribe_from_transport(
self.status_objs, self, default=None,
)
if self.status_update_timer is not None:
self.status_update_timer.start(delay=self.status_interval)
def _handle_klippy_disconnect(self):
if self.status_update_timer is not None:
self.status_update_timer.stop()
if self.status_cache:
payload = self.status_cache
self.status_cache = {}
self._publish_status_update(payload, self.last_status_time)
def _on_message(self,
client: str,
@ -694,18 +716,29 @@ class MQTTClient(APITransport):
else:
self.timestamp_deque.append(ts)
def send_status(self,
status: Dict[str, Any],
eventtime: float
) -> None:
def send_status(self, status: Dict[str, Any], eventtime: float) -> None:
if not status or not self.is_connected():
return
if not self.status_interval:
self._publish_status_update(status, eventtime)
else:
for key, val in status.items():
self.status_cache.setdefault(key, {}).update(val)
self.last_status_time = eventtime
def _handle_timed_status_update(self, eventtime: float) -> float:
if self.status_cache:
payload = self.status_cache
self.status_cache = {}
self._publish_status_update(payload, self.last_status_time)
return eventtime + self.status_interval
def _publish_status_update(self, status: Dict[str, Any], eventtime: float) -> None:
if self.publish_split_status:
for objkey in status:
objval = status[objkey]
for statekey in objval:
payload = {'eventtime': eventtime,
'value': objval[statekey]}
payload = {'eventtime': eventtime, 'value': objval[statekey]}
self.publish_topic(
f"{self.klipper_state_prefix}/{objkey}/{statekey}",
payload, retain=True)
@ -718,6 +751,8 @@ class MQTTClient(APITransport):
return self.instance_name
async def close(self) -> None:
if self.status_update_timer is not None:
self.status_update_timer.stop()
if self.connect_task is not None:
self.connect_task.cancel()
self.connect_task = None