Added Tasmota power plugin

Signed-off-by: Matthias Neumayr <matt.neumayr@gmail.com>
This commit is contained in:
Matthias Neumayr 2020-11-20 22:44:16 +01:00 committed by Eric Callahan
parent 8d3e185894
commit 6ba54d9cb1
1 changed files with 73 additions and 0 deletions

View File

@ -14,6 +14,8 @@ import gpiod
from tornado.ioloop import IOLoop
from tornado.iostream import IOStream
from tornado import gen
from tornado.httpclient import AsyncHTTPClient
from tornado.escape import json_decode
class PrinterPower:
def __init__(self, config):
@ -45,6 +47,8 @@ class PrinterPower:
dev = GpioDevice(cfg, self.chip_factory)
elif dev_type == "tplink_smartplug":
dev = TPLinkSmartPlug(cfg)
elif dev_type == "tasmota":
dev = Tasmota(cfg)
else:
raise config.error(f"Unsupported Device Type: {dev_type}")
self.devices[dev.get_name()] = dev
@ -329,6 +333,75 @@ class TPLinkSmartPlug:
f"Error Toggling Device Power: {self.name}")
self.state = state
class Tasmota:
def __init__(self, config):
self.server = config.get_server()
self.addr = config.get("address")
self.output_id = config.getint("output_id", 1)
self.password = config.get("password", "")
name_parts = config.get_name().split(maxsplit=1)
if len(name_parts) != 2:
raise config.error(f"Invalid Section Name: {config.get_name()}")
self.name = name_parts[1]
self.state = "init"
IOLoop.current().spawn_callback(self.initialize)
async def _send_tasmota_command(self, command, password=None):
if command in ["on", "off"]:
out_cmd = f"Power{self.output_id}%20{command}"
elif command == "info":
out_cmd = "Power{self.output_id}"
else:
raise self.server.error(f"Invalid tasmota command: {command}")
url = f"http://{self.addr}/cm?user=admin&password={self.password}&cmnd={out_cmd}"
data = ""
http_client = AsyncHTTPClient()
try:
response = await http_client.fetch(url)
data = json_decode(response.body)
except Exception:
msg = f"Error sending tplink command: {command}"
logging.exception(msg)
raise self.server.error(msg)
return data
async def initialize(self):
await self.refresh_status()
def get_name(self):
return self.name
def get_device_info(self):
return {
'device': self.name,
'status': self.state,
'type': "tasmota"
}
async def refresh_status(self):
try:
res = await self._send_tasmota_command("info")
state = res[f"Power{self.output_id}"].lower()
except Exception:
self.state = "error"
msg = f"Error Refeshing Device Status: {self.name}"
logging.exception(msg)
raise self.server.error(msg) from None
self.state = state
async def set_power(self, state):
try:
res = await self._send_tasmota_command(state)
state = res[f"Power{self.output_id}"].lower()
except Exception:
self.state = "error"
msg = f"Error Setting Device Status: {self.name} to {state}"
logging.exception(msg)
raise self.server.error(msg) from None
self.state = state
# The power plugin has multiple configuration sections
def load_plugin_multi(config):
return PrinterPower(config)