From 6ba54d9cb15ff72af65757dc5855fb2eecea7812 Mon Sep 17 00:00:00 2001 From: Matthias Neumayr Date: Fri, 20 Nov 2020 22:44:16 +0100 Subject: [PATCH] Added Tasmota power plugin Signed-off-by: Matthias Neumayr --- moonraker/plugins/power.py | 73 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/moonraker/plugins/power.py b/moonraker/plugins/power.py index 7ee1e39..1351e9e 100644 --- a/moonraker/plugins/power.py +++ b/moonraker/plugins/power.py @@ -14,6 +14,8 @@ import gpiod from tornado.ioloop import IOLoop from tornado.iostream import IOStream from tornado import gen +from tornado.httpclient import AsyncHTTPClient +from tornado.escape import json_decode class PrinterPower: def __init__(self, config): @@ -45,6 +47,8 @@ class PrinterPower: dev = GpioDevice(cfg, self.chip_factory) elif dev_type == "tplink_smartplug": dev = TPLinkSmartPlug(cfg) + elif dev_type == "tasmota": + dev = Tasmota(cfg) else: raise config.error(f"Unsupported Device Type: {dev_type}") self.devices[dev.get_name()] = dev @@ -329,6 +333,75 @@ class TPLinkSmartPlug: f"Error Toggling Device Power: {self.name}") self.state = state + +class Tasmota: + def __init__(self, config): + self.server = config.get_server() + self.addr = config.get("address") + self.output_id = config.getint("output_id", 1) + self.password = config.get("password", "") + name_parts = config.get_name().split(maxsplit=1) + if len(name_parts) != 2: + raise config.error(f"Invalid Section Name: {config.get_name()}") + self.name = name_parts[1] + self.state = "init" + IOLoop.current().spawn_callback(self.initialize) + + async def _send_tasmota_command(self, command, password=None): + if command in ["on", "off"]: + out_cmd = f"Power{self.output_id}%20{command}" + elif command == "info": + out_cmd = "Power{self.output_id}" + else: + raise self.server.error(f"Invalid tasmota command: {command}") + + url = f"http://{self.addr}/cm?user=admin&password={self.password}&cmnd={out_cmd}" + data = "" + http_client = AsyncHTTPClient() + try: + response = await http_client.fetch(url) + data = json_decode(response.body) + except Exception: + msg = f"Error sending tplink command: {command}" + logging.exception(msg) + raise self.server.error(msg) + return data + + async def initialize(self): + await self.refresh_status() + + def get_name(self): + return self.name + + def get_device_info(self): + return { + 'device': self.name, + 'status': self.state, + 'type': "tasmota" + } + + async def refresh_status(self): + try: + res = await self._send_tasmota_command("info") + state = res[f"Power{self.output_id}"].lower() + except Exception: + self.state = "error" + msg = f"Error Refeshing Device Status: {self.name}" + logging.exception(msg) + raise self.server.error(msg) from None + self.state = state + + async def set_power(self, state): + try: + res = await self._send_tasmota_command(state) + state = res[f"Power{self.output_id}"].lower() + except Exception: + self.state = "error" + msg = f"Error Setting Device Status: {self.name} to {state}" + logging.exception(msg) + raise self.server.error(msg) from None + self.state = state + # The power plugin has multiple configuration sections def load_plugin_multi(config): return PrinterPower(config)