power: add mqtt device support

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2021-06-26 20:19:13 -04:00
parent f57bddfe4a
commit d0b1621bd5
1 changed files with 197 additions and 17 deletions

View File

@ -31,6 +31,9 @@ if TYPE_CHECKING:
from websockets import WebRequest from websockets import WebRequest
from .machine import Machine from .machine import Machine
from . import klippy_apis from . import klippy_apis
from .mqtt import MQTTClient
from .template import TemplateFactory
from .template import JinjaTemplate
APIComp = klippy_apis.KlippyAPI APIComp = klippy_apis.KlippyAPI
class PrinterPower: class PrinterPower:
@ -47,7 +50,8 @@ class PrinterPower:
"homeseer": HomeSeer, "homeseer": HomeSeer,
"homeassistant": HomeAssistant, "homeassistant": HomeAssistant,
"loxonev1": Loxonev1, "loxonev1": Loxonev1,
"rf": RFDevice "rf": RFDevice,
"mqtt": MQTTDevice
} }
for section in prefix_sections: for section in prefix_sections:
@ -188,16 +192,18 @@ class PrinterPower:
device: PowerDevice, device: PowerDevice,
req: str req: str
) -> str: ) -> str:
base_state: str = device.get_state()
ret = device.refresh_status() ret = device.refresh_status()
if ret is not None: if ret is not None:
await ret await ret
dev_info = device.get_device_info() cur_state: str = device.get_state()
if req == "toggle": if req == "toggle":
req = "on" if dev_info['status'] == "off" else "off" req = "on" if cur_state == "off" else "off"
if req in ["on", "off"]: if req in ["on", "off"]:
cur_state: str = dev_info['status']
if req == cur_state: if req == cur_state:
# device is already in requested state, do nothing # device is already in requested state, do nothing
if base_state != cur_state:
device.notify_power_changed()
return cur_state return cur_state
printing = await self._check_klippy_printing() printing = await self._check_klippy_printing()
if device.get_locked_while_printing() and printing: if device.get_locked_while_printing() and printing:
@ -207,22 +213,23 @@ class PrinterPower:
ret = device.set_power(req) ret = device.set_power(req)
if ret is not None: if ret is not None:
await ret await ret
dev_info = device.get_device_info() cur_state = device.get_state()
self.server.send_event("power:power_changed", dev_info) await device.process_power_changed()
await device.run_power_changed_action()
elif req != "status": elif req != "status":
raise self.server.error(f"Unsupported power request: {req}") raise self.server.error(f"Unsupported power request: {req}")
return dev_info['status'] elif base_state != cur_state:
device.notify_power_changed()
return cur_state
def set_device_power(self, device: str, state: str) -> None: def set_device_power(self, device: str, state: Union[bool, str]) -> None:
status: Optional[str] = None request: str = ""
if isinstance(state, bool): if isinstance(state, bool):
status = "on" if state else "off" request = "on" if state else "off"
elif isinstance(state, str): elif isinstance(state, str):
status = state.lower() request = state.lower()
if status in ["true", "false"]: if request in ["true", "false"]:
status = "on" if status == "true" else "off" request = "on" if request == "true" else "off"
if status not in ["on", "off"]: if request not in ["on", "off", "toggle"]:
logging.info(f"Invalid state received: {state}") logging.info(f"Invalid state received: {state}")
return return
if device not in self.devices: if device not in self.devices:
@ -230,7 +237,7 @@ class PrinterPower:
return return
event_loop = self.server.get_event_loop() event_loop = self.server.get_event_loop()
event_loop.register_callback( event_loop.register_callback(
self._process_request, self.devices[device], status) self._process_request, self.devices[device], request)
async def add_device(self, name: str, device: PowerDevice) -> None: async def add_device(self, name: str, device: PowerDevice) -> None:
if name in self.devices: if name in self.devices:
@ -308,7 +315,12 @@ class PowerDevice:
def get_locked_while_printing(self) -> bool: def get_locked_while_printing(self) -> bool:
return self.locked_while_printing return self.locked_while_printing
async def run_power_changed_action(self) -> None: def notify_power_changed(self) -> None:
dev_info = self.get_device_info()
self.server.send_event("power:power_changed", dev_info)
async def process_power_changed(self) -> None:
self.notify_power_changed()
if self.bound_service is not None: if self.bound_service is not None:
machine_cmp: Machine = self.server.lookup_component("machine") machine_cmp: Machine = self.server.lookup_component("machine")
action = "start" if self.state == "on" else "stop" action = "start" if self.state == "on" else "stop"
@ -850,6 +862,174 @@ class Loxonev1(HTTPDevice):
return "on" if int(state) == 1 else "off" return "on" if int(state) == 1 else "off"
class MQTTDevice(PowerDevice):
def __init__(self, config: ConfigHelper) -> None:
super().__init__(config)
self.mqtt: MQTTClient = self.server.load_component(config, 'mqtt')
self.eventloop = self.server.get_event_loop()
self.cmd_topic: str = config.get('command_topic')
self.cmd_payload: JinjaTemplate = config.gettemplate('command_payload')
self.retain_cmd_state = config.getboolean('retain_command_state', False)
self.query_topic: Optional[str] = config.get('query_topic', None)
self.query_payload = config.gettemplate('query_payload', None)
self.must_query = config.getboolean('query_after_command', False)
if self.query_topic is not None:
self.must_query = False
self.state_topic: str = config.get('state_topic')
self.state_timeout = config.getfloat('state_timeout', 2.)
template: TemplateFactory = self.server.lookup_component('template')
default_state_response = template.create_template("{payload}")
self.state_response = config.gettemplate('state_response_template',
default_state_response)
self.qos: Optional[int] = config.getint('qos', None, minval=0, maxval=2)
self.mqtt.subscribe_topic(
self.state_topic, self._on_state_update, self.qos)
self.query_response: Optional[asyncio.Future] = None
self.request_mutex = asyncio.Lock()
self.server.register_event_handler(
"mqtt:connected", self._on_mqtt_connected)
self.server.register_event_handler(
"mqtt:disconnected", self._on_mqtt_disconnected)
def _on_state_update(self, payload: bytes) -> None:
last_state = self.state
in_request = self.request_mutex.locked()
err: Optional[Exception] = None
context = {
'payload': payload.decode()
}
try:
response = self.state_response.render(context)
except Exception as e:
err = e
self.state = "error"
else:
response = response.lower()
if response not in ["on", "off"]:
err_msg = "Invalid State Received. " \
f"Raw Payload: '{payload.decode()}', Rendered: '{response}"
logging.info(f"MQTT Power Device {self.name}: {err_msg}")
err = self.server.error(err_msg, 500)
self.state = "error"
else:
self.state = response
if not in_request and last_state != self.state:
logging.info(f"MQTT Power Device {self.name}: External Power "
f"event detected, new state: {self.state}")
self.notify_power_changed()
if (
self.query_response is not None and
not self.query_response.done()
):
if err is not None:
self.query_response.set_exception(err)
else:
self.query_response.set_result(response)
async def _on_mqtt_connected(self) -> None:
async with self.request_mutex:
if self.state in ["on", "off"]:
return
self.state = "init"
success = False
while self.mqtt.is_connected():
self.query_response = self.eventloop.create_future()
try:
await self._wait_for_update(self.query_response)
except asyncio.TimeoutError:
# Only wait once if no query topic is set.
# Assume that the MQTT device has set the retain
# flag on the state topic, and therefore should get
# an immediate response upon subscription.
if self.query_topic is None:
logging.info(f"MQTT Power Device {self.name}: "
"Initialization Timed Out")
break
except Exception:
logging.exception(f"MQTT Power Device {self.name}: "
"Init Failed")
break
else:
success = True
break
await asyncio.sleep(2.)
self.query_response = None
if not success:
self.state = "error"
else:
logging.info(
f"MQTT Power Device {self.name} initialized")
self.notify_power_changed()
async def _on_mqtt_disconnected(self):
if (
self.query_response is not None and
not self.query_response.done()
):
self.query_response.set_exception(
self.server.error("MQTT Disconnected", 503))
async with self.request_mutex:
self.state = "error"
self.notify_power_changed()
async def refresh_status(self) -> None:
if (
self.query_topic is not None and
(self.must_query or self.state not in ["on", "off"])
):
if not self.mqtt.is_connected():
raise self.server.error(
f"MQTT Power Device {self.name}: "
"MQTT Not Connected", 503)
async with self.request_mutex:
self.query_response = self.eventloop.create_future()
try:
await self._wait_for_update(self.query_response)
except Exception:
logging.exception(f"MQTT Power Device {self.name}: "
"Failed to refresh state")
self.state = "error"
self.query_response = None
async def _wait_for_update(self, fut: asyncio.Future,
do_query: bool = True
) -> str:
if self.query_topic is not None and do_query:
payload: Optional[str] = None
if self.query_payload is not None:
payload = self.query_payload.render()
await self.mqtt.publish_topic(self.query_topic, payload,
self.qos)
return await asyncio.wait_for(fut, timeout=self.state_timeout)
async def set_power(self, state: str) -> None:
if not self.mqtt.is_connected():
raise self.server.error(
f"MQTT Power Device {self.name}: "
"MQTT Not Connected", 503)
async with self.request_mutex:
self.query_response = self.eventloop.create_future()
new_state = "error"
try:
payload = self.cmd_payload.render({'command': state})
await self.mqtt.publish_topic(
self.cmd_topic, payload, self.qos,
retain=self.retain_cmd_state)
new_state = await self._wait_for_update(
self.query_response, do_query=self.must_query)
except Exception:
logging.exception(
f"MQTT Power Device {self.name}: Failed to set state")
new_state = "error"
self.query_response = None
self.state = new_state
if self.state == "error":
raise self.server.error(
f"MQTT Power Device {self.name}: Failed to set "
f"device to state '{state}'", 500)
# The power component has multiple configuration sections # The power component has multiple configuration sections
def load_component(config: ConfigHelper) -> PrinterPower: def load_component(config: ConfigHelper) -> PrinterPower:
return PrinterPower(config) return PrinterPower(config)