mqtt: add support for publishing klipper status updates

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2021-10-08 12:22:49 -04:00
parent b1f122beec
commit 0c311aeef4
1 changed files with 30 additions and 3 deletions

View File

@ -12,7 +12,7 @@ import json
import pathlib import pathlib
from collections import deque from collections import deque
import paho.mqtt.client as paho_mqtt import paho.mqtt.client as paho_mqtt
from websockets import WebRequest, JsonRPC, APITransport from websockets import Subscribable, WebRequest, JsonRPC, APITransport
# Annotation imports # Annotation imports
from typing import ( from typing import (
@ -131,7 +131,7 @@ class AIOHelper:
logging.info("MQTT Misc Loop Complete") logging.info("MQTT Misc Loop Complete")
class MQTTClient(APITransport): class MQTTClient(APITransport, Subscribable):
def __init__(self, config: ConfigHelper) -> None: def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server() self.server = config.get_server()
self.event_loop = self.server.get_event_loop() self.event_loop = self.server.get_event_loop()
@ -186,11 +186,20 @@ class MQTTClient(APITransport):
self._handle_subscription_request, self._handle_subscription_request,
transports=["http", "websocket"]) transports=["http", "websocket"])
# Subscribe to API requests # Subscribe to API requests
self.json_rpc = JsonRPC(transport="MQTT") self.json_rpc = JsonRPC(transport="MQTT")
self.api_request_topic = f"{self.instance_name}/moonraker/api/request" self.api_request_topic = f"{self.instance_name}/moonraker/api/request"
self.api_resp_topic = f"{self.instance_name}/moonraker/api/response" self.api_resp_topic = f"{self.instance_name}/moonraker/api/response"
self.klipper_status_topic = f"{self.instance_name}/klipper/status"
status_cfg = config.get("status_objects", None)
self.status_objs: Dict[str, Any] = {}
if status_cfg is not None:
status_cfg = status_cfg.strip()
self.status_objs = {k.strip(): None for k in status_cfg.split('\n')
if k.strip()}
self.server.register_event_handler("server:klippy_identified",
self._handle_klippy_identified)
self.timestamp_deque: Deque = deque(maxlen=20) self.timestamp_deque: Deque = deque(maxlen=20)
self.api_qos = config.getint('api_qos', self.qos) self.api_qos = config.getint('api_qos', self.qos)
if config.getboolean("enable_moonraker_api", True): if config.getboolean("enable_moonraker_api", True):
@ -233,6 +242,15 @@ class MQTTClient(APITransport):
self.client.socket().setsockopt( self.client.socket().setsockopt(
socket.SOL_SOCKET, socket.SO_SNDBUF, 2048) socket.SOL_SOCKET, socket.SO_SNDBUF, 2048)
async def _handle_klippy_identified(self) -> None:
if self.status_objs:
args = {'objects': self.status_objs}
try:
await self.server.make_request(
WebRequest("objects/subscribe", args, conn=self))
except self.server.error:
pass
def _on_message(self, def _on_message(self,
client: str, client: str,
user_data: Any, user_data: Any,
@ -574,6 +592,15 @@ class MQTTClient(APITransport):
else: else:
self.timestamp_deque.append(ts) self.timestamp_deque.append(ts)
def send_status(self,
status: Dict[str, Any],
eventtime: float
) -> None:
if not status:
return
payload = {'eventtime': eventtime, 'status': status}
self.publish_topic(self.klipper_status_topic, payload)
async def close(self) -> None: async def close(self) -> None:
if self.reconnect_task is not None: if self.reconnect_task is not None:
self.reconnect_task.cancel() self.reconnect_task.cancel()