power: add support for tplink smartplug (Kasa) devices

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2020-11-16 12:36:28 -05:00
parent 47c7cea5a5
commit 8ead49504b
1 changed files with 115 additions and 0 deletions

View File

@ -7,8 +7,12 @@
import logging import logging
import os import os
import asyncio import asyncio
import json
import struct
import socket
import gpiod import gpiod
from tornado.ioloop import IOLoop from tornado.ioloop import IOLoop
from tornado.iostream import IOStream
from tornado import gen from tornado import gen
class PrinterPower: class PrinterPower:
@ -39,6 +43,8 @@ class PrinterPower:
dev_type = cfg.get("type") dev_type = cfg.get("type")
if dev_type == "gpio": if dev_type == "gpio":
dev = GpioDevice(cfg, self.chip_factory) dev = GpioDevice(cfg, self.chip_factory)
elif dev_type == "tplink_smartplug":
dev = TPLinkSmartPlug(cfg)
else: else:
raise config.error(f"Unsupported Device Type: {dev_type}") raise config.error(f"Unsupported Device Type: {dev_type}")
self.devices[dev.get_name()] = dev self.devices[dev.get_name()] = dev
@ -214,6 +220,115 @@ class GpioDevice:
def close(self): def close(self):
self.line.release() self.line.release()
# This implementation based off the work tplink_smartplug
# script by Lubomir Stroetmann available at:
#
# https://github.com/softScheck/tplink-smartplug
#
# Copyright 2016 softScheck GmbH
class TPLinkSmartPlug:
START_KEY = 0xAB
def __init__(self, config):
self.server = config.get_server()
self.addr = config.get("address")
self.port = config.getint("port", 9999)
name_parts = config.get_name().split(maxsplit=1)
if len(name_parts) != 2:
raise config.error(f"Invalid Section Name: {config.get_name()}")
self.name = name_parts[1]
self.state = "init"
IOLoop.current().spawn_callback(self.initialize)
async def _send_tplink_command(self, command):
out_cmd = {}
if command in ["on", "off"]:
out_cmd = {'system': {'set_relay_state':
{'state': int(command == "on")}}}
elif command == "info":
out_cmd = {'system': {'get_sysinfo': {}}}
else:
raise self.server.error(f"Invalid tplink command: {command}")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
stream = IOStream(s)
try:
await stream.connect((self.addr, self.port))
await stream.write(self._encrypt(out_cmd))
data = await stream.read_bytes(2048, partial=True)
length = struct.unpack(">I", data[:4])[0]
data = data[4:]
retries = 5
remaining = length - len(data)
while remaining and retries:
data += await stream.read_bytes(remaining)
remaining = length - len(data)
retries -= 1
if not retries:
raise self.server.error("Unable to read tplink packet")
except Exception:
msg = f"Error sending tplink command: {command}"
logging.exception(msg)
raise self.server.error(msg)
finally:
stream.close()
return json.loads(self._decrypt(data))
def _encrypt(self, data):
data = json.dumps(data)
key = self.START_KEY
res = struct.pack(">I", len(data))
for c in data:
val = key ^ ord(c)
key = val
res += bytes([val])
return res
def _decrypt(self, data):
key = self.START_KEY
res = ""
for c in data:
val = key ^ c
key = c
res += chr(val)
return res
async def initialize(self):
await self.refresh_status()
def get_name(self):
return self.name
def get_device_info(self):
return {
'device': self.name,
'status': self.state,
'type': "tplink_smartplug"
}
async def refresh_status(self):
try:
res = await self._send_tplink_command("info")
state = res['system']['get_sysinfo']['relay_state']
except Exception:
self.state = "error"
msg = f"Error Refeshing Device Status: {self.name}"
logging.exception(msg)
raise self.server.error(msg) from None
self.state = "on" if state else "off"
async def set_power(self, state):
try:
res = await self._send_tplink_command(state)
err = res['system']['set_relay_state']['err_code']
except Exception:
err = 1
logging.exception(f"Power Toggle Error: {self.name}")
if err:
self.state = "error"
raise self.server.error(
f"Error Toggling Device Power: {self.name}")
self.state = state
# The power plugin has multiple configuration sections # The power plugin has multiple configuration sections
def load_plugin_multi(config): def load_plugin_multi(config):
return PrinterPower(config) return PrinterPower(config)