From 8ead49504b691a4c2d38fddcf62792a369875d85 Mon Sep 17 00:00:00 2001 From: Arksine Date: Mon, 16 Nov 2020 12:36:28 -0500 Subject: [PATCH] power: add support for tplink smartplug (Kasa) devices Signed-off-by: Eric Callahan --- moonraker/plugins/power.py | 115 +++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/moonraker/plugins/power.py b/moonraker/plugins/power.py index ec7aa5b..7ee1e39 100644 --- a/moonraker/plugins/power.py +++ b/moonraker/plugins/power.py @@ -7,8 +7,12 @@ import logging import os import asyncio +import json +import struct +import socket import gpiod from tornado.ioloop import IOLoop +from tornado.iostream import IOStream from tornado import gen class PrinterPower: @@ -39,6 +43,8 @@ class PrinterPower: dev_type = cfg.get("type") if dev_type == "gpio": dev = GpioDevice(cfg, self.chip_factory) + elif dev_type == "tplink_smartplug": + dev = TPLinkSmartPlug(cfg) else: raise config.error(f"Unsupported Device Type: {dev_type}") self.devices[dev.get_name()] = dev @@ -214,6 +220,115 @@ class GpioDevice: def close(self): self.line.release() + +# This implementation based off the work tplink_smartplug +# script by Lubomir Stroetmann available at: +# +# https://github.com/softScheck/tplink-smartplug +# +# Copyright 2016 softScheck GmbH +class TPLinkSmartPlug: + START_KEY = 0xAB + def __init__(self, config): + self.server = config.get_server() + self.addr = config.get("address") + self.port = config.getint("port", 9999) + name_parts = config.get_name().split(maxsplit=1) + if len(name_parts) != 2: + raise config.error(f"Invalid Section Name: {config.get_name()}") + self.name = name_parts[1] + self.state = "init" + IOLoop.current().spawn_callback(self.initialize) + + async def _send_tplink_command(self, command): + out_cmd = {} + if command in ["on", "off"]: + out_cmd = {'system': {'set_relay_state': + {'state': int(command == "on")}}} + elif command == "info": + out_cmd = {'system': {'get_sysinfo': {}}} + else: + raise self.server.error(f"Invalid tplink command: {command}") + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + stream = IOStream(s) + try: + await stream.connect((self.addr, self.port)) + await stream.write(self._encrypt(out_cmd)) + data = await stream.read_bytes(2048, partial=True) + length = struct.unpack(">I", data[:4])[0] + data = data[4:] + retries = 5 + remaining = length - len(data) + while remaining and retries: + data += await stream.read_bytes(remaining) + remaining = length - len(data) + retries -= 1 + if not retries: + raise self.server.error("Unable to read tplink packet") + except Exception: + msg = f"Error sending tplink command: {command}" + logging.exception(msg) + raise self.server.error(msg) + finally: + stream.close() + return json.loads(self._decrypt(data)) + + def _encrypt(self, data): + data = json.dumps(data) + key = self.START_KEY + res = struct.pack(">I", len(data)) + for c in data: + val = key ^ ord(c) + key = val + res += bytes([val]) + return res + + def _decrypt(self, data): + key = self.START_KEY + res = "" + for c in data: + val = key ^ c + key = c + res += chr(val) + return res + + async def initialize(self): + await self.refresh_status() + + def get_name(self): + return self.name + + def get_device_info(self): + return { + 'device': self.name, + 'status': self.state, + 'type': "tplink_smartplug" + } + + async def refresh_status(self): + try: + res = await self._send_tplink_command("info") + state = res['system']['get_sysinfo']['relay_state'] + except Exception: + self.state = "error" + msg = f"Error Refeshing Device Status: {self.name}" + logging.exception(msg) + raise self.server.error(msg) from None + self.state = "on" if state else "off" + + async def set_power(self, state): + try: + res = await self._send_tplink_command(state) + err = res['system']['set_relay_state']['err_code'] + except Exception: + err = 1 + logging.exception(f"Power Toggle Error: {self.name}") + if err: + self.state = "error" + raise self.server.error( + f"Error Toggling Device Power: {self.name}") + self.state = state + # The power plugin has multiple configuration sections def load_plugin_multi(config): return PrinterPower(config)