From 0c311aeef420d00fadcd4d9455e46da7aa189889 Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Fri, 8 Oct 2021 12:22:49 -0400 Subject: [PATCH] mqtt: add support for publishing klipper status updates Signed-off-by: Eric Callahan --- moonraker/components/mqtt.py | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/moonraker/components/mqtt.py b/moonraker/components/mqtt.py index 17e616f..2d4e0cd 100644 --- a/moonraker/components/mqtt.py +++ b/moonraker/components/mqtt.py @@ -12,7 +12,7 @@ import json import pathlib from collections import deque import paho.mqtt.client as paho_mqtt -from websockets import WebRequest, JsonRPC, APITransport +from websockets import Subscribable, WebRequest, JsonRPC, APITransport # Annotation imports from typing import ( @@ -131,7 +131,7 @@ class AIOHelper: logging.info("MQTT Misc Loop Complete") -class MQTTClient(APITransport): +class MQTTClient(APITransport, Subscribable): def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() self.event_loop = self.server.get_event_loop() @@ -186,11 +186,20 @@ class MQTTClient(APITransport): self._handle_subscription_request, transports=["http", "websocket"]) - # Subscribe to API requests self.json_rpc = JsonRPC(transport="MQTT") self.api_request_topic = f"{self.instance_name}/moonraker/api/request" self.api_resp_topic = f"{self.instance_name}/moonraker/api/response" + self.klipper_status_topic = f"{self.instance_name}/klipper/status" + status_cfg = config.get("status_objects", None) + self.status_objs: Dict[str, Any] = {} + if status_cfg is not None: + status_cfg = status_cfg.strip() + self.status_objs = {k.strip(): None for k in status_cfg.split('\n') + if k.strip()} + self.server.register_event_handler("server:klippy_identified", + self._handle_klippy_identified) + self.timestamp_deque: Deque = deque(maxlen=20) self.api_qos = config.getint('api_qos', self.qos) if config.getboolean("enable_moonraker_api", True): @@ -233,6 +242,15 @@ class MQTTClient(APITransport): self.client.socket().setsockopt( socket.SOL_SOCKET, socket.SO_SNDBUF, 2048) + async def _handle_klippy_identified(self) -> None: + if self.status_objs: + args = {'objects': self.status_objs} + try: + await self.server.make_request( + WebRequest("objects/subscribe", args, conn=self)) + except self.server.error: + pass + def _on_message(self, client: str, user_data: Any, @@ -574,6 +592,15 @@ class MQTTClient(APITransport): else: self.timestamp_deque.append(ts) + def send_status(self, + status: Dict[str, Any], + eventtime: float + ) -> None: + if not status: + return + payload = {'eventtime': eventtime, 'status': status} + self.publish_topic(self.klipper_status_topic, payload) + async def close(self) -> None: if self.reconnect_task is not None: self.reconnect_task.cancel()