power: add annotiations

This includes a refactoring of the PowerDevice base class so that it defines some abstract methods that its children must implement.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2021-05-15 06:53:34 -04:00
parent 12246029ef
commit 1ada457364
1 changed files with 177 additions and 109 deletions

View File

@ -4,9 +4,8 @@
# #
# This file may be distributed under the terms of the GNU GPLv3 license. # This file may be distributed under the terms of the GNU GPLv3 license.
from __future__ import annotations
import logging import logging
import os
import asyncio
import json import json
import struct import struct
import socket import socket
@ -17,11 +16,29 @@ from tornado.locks import Lock
from tornado.httpclient import AsyncHTTPClient from tornado.httpclient import AsyncHTTPClient
from tornado.escape import json_decode from tornado.escape import json_decode
# Annotation imports
from typing import (
TYPE_CHECKING,
Type,
List,
Any,
Optional,
Dict,
Coroutine,
Tuple,
Union,
)
if TYPE_CHECKING:
from confighelper import ConfigHelper
from websockets import WebRequest
from . import klippy_apis
APIComp = klippy_apis.KlippyAPI
class PrinterPower: class PrinterPower:
def __init__(self, config): def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server() self.server = config.get_server()
self.chip_factory = GpioChipFactory() self.chip_factory = GpioChipFactory()
self.devices = {} self.devices: Dict[str, PowerDevice] = {}
prefix_sections = config.get_prefix_sections("power") prefix_sections = config.get_prefix_sections("power")
logging.info(f"Power component loading devices: {prefix_sections}") logging.info(f"Power component loading devices: {prefix_sections}")
dev_types = { dev_types = {
@ -36,14 +53,14 @@ class PrinterPower:
try: try:
for section in prefix_sections: for section in prefix_sections:
cfg = config[section] cfg = config[section]
dev_type = cfg.get("type") dev_type: str = cfg.get("type")
dev_class: Optional[Type[PowerDevice]]
dev_class = dev_types.get(dev_type) dev_class = dev_types.get(dev_type)
if dev_class is None: if dev_class is None:
raise config.error(f"Unsupported Device Type: {dev_type}") raise config.error(f"Unsupported Device Type: {dev_type}")
elif dev_type == "gpio": dev = dev_class(cfg)
dev = dev_class(cfg, self.chip_factory) if isinstance(dev, GpioDevice):
else: dev.configure_line(cfg, self.chip_factory)
dev = dev_class(cfg)
self.devices[dev.get_name()] = dev self.devices[dev.get_name()] = dev
except Exception: except Exception:
self.chip_factory.close() self.chip_factory.close()
@ -72,35 +89,40 @@ class PrinterPower:
IOLoop.current().spawn_callback( IOLoop.current().spawn_callback(
self._initalize_devices, list(self.devices.values())) self._initalize_devices, list(self.devices.values()))
async def _check_klippy_printing(self): async def _check_klippy_printing(self) -> bool:
klippy_apis = self.server.lookup_component('klippy_apis') kapis: APIComp = self.server.lookup_component('klippy_apis')
result = await klippy_apis.query_objects( result: Dict[str, Any] = await kapis.query_objects(
{'print_stats': None}, default={}) {'print_stats': None}, default={})
pstate = result.get('print_stats', {}).get('state', "").lower() pstate = result.get('print_stats', {}).get('state', "").lower()
return pstate == "printing" return pstate == "printing"
async def _initalize_devices(self, inital_devs): async def _initalize_devices(self,
inital_devs: List[PowerDevice]
) -> None:
for dev in inital_devs: for dev in inital_devs:
ret = dev.initialize() ret = dev.initialize()
if asyncio.iscoroutine(ret): if ret is not None:
await ret await ret
async def _handle_klippy_shutdown(self): async def _handle_klippy_shutdown(self) -> None:
for name, dev in self.devices.items(): for name, dev in self.devices.items():
if hasattr(dev, "off_when_shutdown"): if dev.has_off_when_shutdown():
if dev.off_when_shutdown: logging.info(
logging.info( f"Powering off device [{name}] due to"
f"Powering off device [{name}] due to" " klippy shutdown")
" klippy shutdown") await self._process_request(dev, "off")
await self._process_request(dev, "off")
async def _handle_list_devices(self, web_request): async def _handle_list_devices(self,
web_request: WebRequest
) -> Dict[str, Any]:
dev_list = [d.get_device_info() for d in self.devices.values()] dev_list = [d.get_device_info() for d in self.devices.values()]
output = {"devices": dev_list} output = {"devices": dev_list}
return output return output
async def _handle_single_power_request(self, web_request): async def _handle_single_power_request(self,
dev_name = web_request.get_str('device') web_request: WebRequest
) -> Dict[str, Any]:
dev_name: str = web_request.get_str('device')
req_action = web_request.get_action() req_action = web_request.get_action()
if dev_name not in self.devices: if dev_name not in self.devices:
raise self.server.error(f"No valid device named {dev_name}") raise self.server.error(f"No valid device named {dev_name}")
@ -115,7 +137,9 @@ class PrinterPower:
result = await self._process_request(dev, action) result = await self._process_request(dev, action)
return {dev_name: result} return {dev_name: result}
async def _handle_batch_power_request(self, web_request): async def _handle_batch_power_request(self,
web_request: WebRequest
) -> Dict[str, Any]:
args = web_request.get_args() args = web_request.get_args()
ep = web_request.get_endpoint() ep = web_request.get_endpoint()
if not args: if not args:
@ -130,15 +154,18 @@ class PrinterPower:
result[name] = "device_not_found" result[name] = "device_not_found"
return result return result
async def _process_request(self, device, req): async def _process_request(self,
device: PowerDevice,
req: str
) -> str:
ret = device.refresh_status() ret = device.refresh_status()
if asyncio.iscoroutine(ret): if ret is not None:
await ret await ret
dev_info = device.get_device_info() dev_info = device.get_device_info()
if req == "toggle": if req == "toggle":
req = "on" if dev_info['status'] == "off" else "off" req = "on" if dev_info['status'] == "off" else "off"
if req in ["on", "off"]: if req in ["on", "off"]:
cur_state = dev_info['status'] cur_state: str = dev_info['status']
if req == cur_state: if req == cur_state:
# device is already in requested state, do nothing # device is already in requested state, do nothing
return cur_state return cur_state
@ -148,7 +175,7 @@ class PrinterPower:
f"Unable to change power for {device.get_name()} " f"Unable to change power for {device.get_name()} "
"while printing") "while printing")
ret = device.set_power(req) ret = device.set_power(req)
if asyncio.iscoroutine(ret): if ret is not None:
await ret await ret
dev_info = device.get_device_info() dev_info = device.get_device_info()
self.server.send_event("power:power_changed", dev_info) self.server.send_event("power:power_changed", dev_info)
@ -157,8 +184,8 @@ class PrinterPower:
raise self.server.error(f"Unsupported power request: {req}") raise self.server.error(f"Unsupported power request: {req}")
return dev_info['status'] return dev_info['status']
def set_device_power(self, device, state): def set_device_power(self, device: str, state: str) -> None:
status = None status: Optional[str] = None
if isinstance(state, bool): if isinstance(state, bool):
status = "on" if state else "off" status = "on" if state else "off"
elif isinstance(state, str): elif isinstance(state, str):
@ -175,33 +202,32 @@ class PrinterPower:
ioloop.spawn_callback( ioloop.spawn_callback(
self._process_request, self.devices[device], status) self._process_request, self.devices[device], status)
async def add_device(self, name, device): async def add_device(self, name: str, device: PowerDevice) -> None:
if name in self.devices: if name in self.devices:
raise self.server.error( raise self.server.error(
f"Device [{name}] already configured") f"Device [{name}] already configured")
ret = device.initialize() ret = device.initialize()
if asyncio.iscoroutine(ret): if ret is not None:
await ret await ret
self.devices[name] = device self.devices[name] = device
async def close(self): async def close(self) -> None:
for device in self.devices.values(): for device in self.devices.values():
if hasattr(device, "close"): ret = device.close()
ret = device.close() if ret is not None:
if asyncio.iscoroutine(ret): await ret
await ret
self.chip_factory.close() self.chip_factory.close()
class PowerDevice: class PowerDevice:
def __init__(self, config): def __init__(self, config: ConfigHelper) -> None:
name_parts = config.get_name().split(maxsplit=1) name_parts = config.get_name().split(maxsplit=1)
if len(name_parts) != 2: if len(name_parts) != 2:
raise config.error(f"Invalid Section Name: {config.get_name()}") raise config.error(f"Invalid Section Name: {config.get_name()}")
self.server = config.get_server() self.server = config.get_server()
self.name = name_parts[1] self.name = name_parts[1]
self.type = config.get('type') self.type: str = config.get('type')
self.state = "init" self.state: str = "init"
self.locked_while_printing = config.getboolean( self.locked_while_printing = config.getboolean(
'locked_while_printing', False) 'locked_while_printing', False)
self.off_when_shutdown = config.getboolean('off_when_shutdown', False) self.off_when_shutdown = config.getboolean('off_when_shutdown', False)
@ -213,10 +239,10 @@ class PowerDevice:
if self.restart_delay < .000001: if self.restart_delay < .000001:
raise config.error("Option 'restart_delay' must be above 0.0") raise config.error("Option 'restart_delay' must be above 0.0")
def get_name(self): def get_name(self) -> str:
return self.name return self.name
def get_device_info(self): def get_device_info(self) -> Dict[str, Any]:
return { return {
'device': self.name, 'device': self.name,
'status': self.state, 'status': self.state,
@ -224,31 +250,54 @@ class PowerDevice:
'type': self.type 'type': self.type
} }
def get_locked_while_printing(self): def get_locked_while_printing(self) -> bool:
return self.locked_while_printing return self.locked_while_printing
def run_power_changed_action(self): def run_power_changed_action(self) -> None:
if self.state == "on" and self.klipper_restart: if self.state == "on" and self.klipper_restart:
ioloop = IOLoop.current() ioloop = IOLoop.current()
klippy_apis = self.server.lookup_component("klippy_apis") kapis: APIComp = self.server.lookup_component("klippy_apis")
ioloop.call_later(self.restart_delay, klippy_apis.do_restart, ioloop.call_later(
"FIRMWARE_RESTART") self.restart_delay, kapis.do_restart, # type:ignore
"FIRMWARE_RESTART")
def has_off_when_shutdown(self) -> bool:
return self.off_when_shutdown
def initialize(self) -> Optional[Coroutine]:
raise NotImplementedError
def refresh_status(self) -> Optional[Coroutine]:
raise NotImplementedError
def set_power(self, state: str) -> Optional[Coroutine]:
raise NotImplementedError
def close(self) -> Optional[Coroutine]:
pass
class HTTPDevice(PowerDevice): class HTTPDevice(PowerDevice):
def __init__(self, config, default_port=None, def __init__(self,
default_user=None, default_password=None): config: ConfigHelper,
default_port: int = -1,
default_user: str = "",
default_password: str = ""
) -> None:
super().__init__(config) super().__init__(config)
self.client = AsyncHTTPClient() self.client = AsyncHTTPClient()
self.request_mutex = Lock() self.request_mutex = Lock()
self.addr = config.get("address") self.addr: str = config.get("address")
self.port = config.getint("port", default_port) self.port = config.getint("port", default_port)
self.user = config.get("user", default_user) self.user = config.get("user", default_user)
self.password = config.get("password", default_password) self.password = config.get("password", default_password)
async def initialize(self): async def initialize(self) -> None:
await self.refresh_status() await self.refresh_status()
async def _send_http_command(self, url, command): async def _send_http_command(self,
url: str,
command: str
) -> Dict[str, Any]:
try: try:
response = await self.client.fetch(url) response = await self.client.fetch(url)
data = json_decode(response.body) data = json_decode(response.body)
@ -258,15 +307,15 @@ class HTTPDevice(PowerDevice):
raise self.server.error(msg) raise self.server.error(msg)
return data return data
async def _send_power_request(self, state): async def _send_power_request(self, state: str) -> str:
raise NotImplementedError( raise NotImplementedError(
"_send_power_request must be implemented by children") "_send_power_request must be implemented by children")
async def _send_status_request(self): async def _send_status_request(self) -> str:
raise NotImplementedError( raise NotImplementedError(
"_send_status_request must be implemented by children") "_send_status_request must be implemented by children")
async def refresh_status(self): async def refresh_status(self) -> None:
async with self.request_mutex: async with self.request_mutex:
try: try:
state = await self._send_status_request() state = await self._send_status_request()
@ -290,23 +339,29 @@ class HTTPDevice(PowerDevice):
class GpioChipFactory: class GpioChipFactory:
def __init__(self): def __init__(self) -> None:
self.chips = {} self.chips: Dict[str, gpiod.Chip] = {}
def get_gpio_chip(self, chip_name): def get_gpio_chip(self, chip_name) -> gpiod.Chip:
if chip_name in self.chips: if chip_name in self.chips:
return self.chips[chip_name] return self.chips[chip_name]
chip = gpiod.Chip(chip_name, gpiod.Chip.OPEN_BY_NAME) chip = gpiod.Chip(chip_name, gpiod.Chip.OPEN_BY_NAME)
self.chips[chip_name] = chip self.chips[chip_name] = chip
return chip return chip
def close(self): def close(self) -> None:
for chip in self.chips.values(): for chip in self.chips.values():
chip.close() chip.close()
class GpioDevice(PowerDevice): class GpioDevice(PowerDevice):
def __init__(self, config, chip_factory): def __init__(self, config: ConfigHelper):
super().__init__(config) super().__init__(config)
self.initial_state = config.getboolean('initial_state', False)
def configure_line(self,
config: ConfigHelper,
chip_factory: GpioChipFactory
) -> None:
pin, chip_id, invert = self._parse_pin(config) pin, chip_id, invert = self._parse_pin(config)
try: try:
chip = chip_factory.get_gpio_chip(chip_id) chip = chip_factory.get_gpio_chip(chip_id)
@ -324,15 +379,15 @@ class GpioDevice(PowerDevice):
f"Unable to init {pin}. Make sure the gpio is not in " f"Unable to init {pin}. Make sure the gpio is not in "
"use by another program or exported by sysfs.") "use by another program or exported by sysfs.")
raise config.error("Power GPIO Config Error") raise config.error("Power GPIO Config Error")
self.initial_state = config.getboolean('initial_state', False)
def _parse_pin(self, config):
def _parse_pin(self, config: ConfigHelper) -> Tuple[int, str, bool]:
pin = cfg_pin = config.get("pin") pin = cfg_pin = config.get("pin")
invert = False invert = False
if pin[0] == "!": if pin[0] == "!":
pin = pin[1:] pin = pin[1:]
invert = True invert = True
chip_id = "gpiochip0" chip_id: str = "gpiochip0"
pin_parts = pin.split("/") pin_parts = pin.split("/")
if len(pin_parts) == 2: if len(pin_parts) == 2:
chip_id, pin = pin_parts chip_id, pin = pin_parts
@ -345,16 +400,16 @@ class GpioDevice(PowerDevice):
not pin[4:].isdigit(): not pin[4:].isdigit():
raise config.error( raise config.error(
f"Invalid Power Pin configuration: {cfg_pin}") f"Invalid Power Pin configuration: {cfg_pin}")
pin = int(pin[4:]) pin_id = int(pin[4:])
return pin, chip_id, invert return pin_id, chip_id, invert
def initialize(self): def initialize(self) -> None:
self.set_power("on" if self.initial_state else "off") self.set_power("on" if self.initial_state else "off")
def refresh_status(self): def refresh_status(self) -> None:
pass pass
def set_power(self, state): def set_power(self, state) -> None:
try: try:
self.line.set_value(int(state == "on")) self.line.set_value(int(state == "on"))
except Exception: except Exception:
@ -364,7 +419,7 @@ class GpioDevice(PowerDevice):
raise self.server.error(msg) from None raise self.server.error(msg) from None
self.state = state self.state = state
def close(self): def close(self) -> None:
self.line.release() self.line.release()
@ -376,14 +431,16 @@ class GpioDevice(PowerDevice):
# Copyright 2016 softScheck GmbH # Copyright 2016 softScheck GmbH
class TPLinkSmartPlug(PowerDevice): class TPLinkSmartPlug(PowerDevice):
START_KEY = 0xAB START_KEY = 0xAB
def __init__(self, config): def __init__(self, config: ConfigHelper) -> None:
super().__init__(config) super().__init__(config)
self.request_mutex = Lock() self.request_mutex = Lock()
self.addr = config.get("address").split('/') self.addr: List[str] = config.get("address").split('/')
self.port = config.getint("port", 9999) self.port = config.getint("port", 9999)
async def _send_tplink_command(self, command): async def _send_tplink_command(self,
out_cmd = {} command: str
) -> Dict[str, Any]:
out_cmd: Dict[str, Any] = {}
if command in ["on", "off"]: if command in ["on", "off"]:
out_cmd = { out_cmd = {
'system': {'set_relay_state': {'state': int(command == "on")}} 'system': {'set_relay_state': {'state': int(command == "on")}}
@ -405,7 +462,7 @@ class TPLinkSmartPlug(PowerDevice):
await stream.connect((self.addr[0], self.port)) await stream.connect((self.addr[0], self.port))
await stream.write(self._encrypt(out_cmd)) await stream.write(self._encrypt(out_cmd))
data = await stream.read_bytes(2048, partial=True) data = await stream.read_bytes(2048, partial=True)
length = struct.unpack(">I", data[:4])[0] length: int = struct.unpack(">I", data[:4])[0]
data = data[4:] data = data[4:]
retries = 5 retries = 5
remaining = length - len(data) remaining = length - len(data)
@ -423,8 +480,8 @@ class TPLinkSmartPlug(PowerDevice):
stream.close() stream.close()
return json.loads(self._decrypt(data)) return json.loads(self._decrypt(data))
def _encrypt(self, data): def _encrypt(self, outdata: Dict[str, Any]) -> bytes:
data = json.dumps(data) data = json.dumps(outdata)
key = self.START_KEY key = self.START_KEY
res = struct.pack(">I", len(data)) res = struct.pack(">I", len(data))
for c in data: for c in data:
@ -433,24 +490,26 @@ class TPLinkSmartPlug(PowerDevice):
res += bytes([val]) res += bytes([val])
return res return res
def _decrypt(self, data): def _decrypt(self, data: bytes) -> str:
key = self.START_KEY key: int = self.START_KEY
res = "" res: str = ""
for c in data: for c in data:
val = key ^ c val = key ^ c
key = c key = c
res += chr(val) res += chr(val)
return res return res
async def initialize(self): async def initialize(self) -> None:
await self.refresh_status() await self.refresh_status()
async def refresh_status(self): async def refresh_status(self) -> None:
async with self.request_mutex: async with self.request_mutex:
try: try:
state: str
res = await self._send_tplink_command("info") res = await self._send_tplink_command("info")
if len(self.addr) == 2: if len(self.addr) == 2:
# TPLink device controls multiple devices # TPLink device controls multiple devices
children: Dict[int, Any]
children = res['system']['get_sysinfo']['children'] children = res['system']['get_sysinfo']['children']
state = children[int(self.addr[1])]['state'] state = children[int(self.addr[1])]['state']
else: else:
@ -462,8 +521,9 @@ class TPLinkSmartPlug(PowerDevice):
raise self.server.error(msg) from None raise self.server.error(msg) from None
self.state = "on" if state else "off" self.state = "on" if state else "off"
async def set_power(self, state): async def set_power(self, state) -> None:
async with self.request_mutex: async with self.request_mutex:
err: int
try: try:
res = await self._send_tplink_command(state) res = await self._send_tplink_command(state)
err = res['system']['set_relay_state']['err_code'] err = res['system']['set_relay_state']['err_code']
@ -478,12 +538,15 @@ class TPLinkSmartPlug(PowerDevice):
class Tasmota(HTTPDevice): class Tasmota(HTTPDevice):
def __init__(self, config): def __init__(self, config: ConfigHelper) -> None:
super().__init__(config, default_password="") super().__init__(config, default_password="")
self.output_id = config.getint("output_id", 1) self.output_id = config.getint("output_id", 1)
self.timer = config.get("timer", "") self.timer = config.get("timer", "")
async def _send_tasmota_command(self, command, password=None): async def _send_tasmota_command(self,
command: str,
password: Optional[str] = None
) -> Dict[str, Any]:
if command in ["on", "off"]: if command in ["on", "off"]:
out_cmd = f"Power{self.output_id}%20{command}" out_cmd = f"Power{self.output_id}%20{command}"
if self.timer != "" and command == "off": if self.timer != "" and command == "off":
@ -497,10 +560,10 @@ class Tasmota(HTTPDevice):
f"{self.password}&cmnd={out_cmd}" f"{self.password}&cmnd={out_cmd}"
return await self._send_http_command(url, command) return await self._send_http_command(url, command)
async def _send_status_request(self): async def _send_status_request(self) -> str:
res = await self._send_tasmota_command("info") res = await self._send_tasmota_command("info")
try: try:
state = res[f"POWER{self.output_id}"].lower() state: str = res[f"POWER{self.output_id}"].lower()
except KeyError as e: except KeyError as e:
if self.output_id == 1: if self.output_id == 1:
state = res[f"POWER"].lower() state = res[f"POWER"].lower()
@ -508,7 +571,7 @@ class Tasmota(HTTPDevice):
raise KeyError(e) raise KeyError(e)
return state return state
async def _send_power_request(self, state): async def _send_power_request(self, state: str) -> str:
res = await self._send_tasmota_command(state) res = await self._send_tasmota_command(state)
if self.timer == "" or state != "off": if self.timer == "" or state != "off":
try: try:
@ -522,12 +585,12 @@ class Tasmota(HTTPDevice):
class Shelly(HTTPDevice): class Shelly(HTTPDevice):
def __init__(self, config): def __init__(self, config: ConfigHelper) -> None:
super().__init__(config, default_user="admin", default_password="") super().__init__(config, default_user="admin", default_password="")
self.output_id = config.getint("output_id", 0) self.output_id = config.getint("output_id", 0)
self.timer = config.get("timer", "") self.timer = config.get("timer", "")
async def _send_shelly_command(self, command): async def _send_shelly_command(self, command: str) -> Dict[str, Any]:
if command == "on": if command == "on":
out_cmd = f"relay/{self.output_id}?turn={command}" out_cmd = f"relay/{self.output_id}?turn={command}"
elif command == "off": elif command == "off":
@ -546,13 +609,13 @@ class Shelly(HTTPDevice):
url = f"http://{out_pwd}{self.addr}/{out_cmd}" url = f"http://{out_pwd}{self.addr}/{out_cmd}"
return await self._send_http_command(url, command) return await self._send_http_command(url, command)
async def _send_status_request(self): async def _send_status_request(self) -> str:
res = await self._send_shelly_command("info") res = await self._send_shelly_command("info")
state = res[f"ison"] state: str = res[f"ison"]
timer_remaining = res[f"timer_remaining"] if self.timer != "" else 0 timer_remaining = res[f"timer_remaining"] if self.timer != "" else 0
return "on" if state and timer_remaining == 0 else "off" return "on" if state and timer_remaining == 0 else "off"
async def _send_power_request(self, state): async def _send_power_request(self, state: str) -> str:
res = await self._send_shelly_command(state) res = await self._send_shelly_command(state)
state = res[f"ison"] state = res[f"ison"]
timer_remaining = res[f"timer_remaining"] if self.timer != "" else 0 timer_remaining = res[f"timer_remaining"] if self.timer != "" else 0
@ -560,21 +623,24 @@ class Shelly(HTTPDevice):
class HomeSeer(HTTPDevice): class HomeSeer(HTTPDevice):
def __init__(self, config): def __init__(self, config: ConfigHelper) -> None:
super().__init__(config, default_user="admin", default_password="") super().__init__(config, default_user="admin", default_password="")
self.device = config.getint("device") self.device = config.getint("device")
async def _send_homeseer(self, request, additional=""): async def _send_homeseer(self,
request: str,
additional: str = ""
) -> Dict[str, Any]:
url = (f"http://{self.user}:{self.password}@{self.addr}" url = (f"http://{self.user}:{self.password}@{self.addr}"
f"/JSON?user={self.user}&pass={self.password}" f"/JSON?user={self.user}&pass={self.password}"
f"&request={request}&ref={self.device}&{additional}") f"&request={request}&ref={self.device}&{additional}")
return await self._send_http_command(url, request) return await self._send_http_command(url, request)
async def _send_status_request(self): async def _send_status_request(self) -> str:
res = await self._send_homeseer("getstatus") res = await self._send_homeseer("getstatus")
return res[f"Devices"][0]["status"].lower() return res[f"Devices"][0]["status"].lower()
async def _send_power_request(self, state): async def _send_power_request(self, state: str) -> str:
if state == "on": if state == "on":
state_hs = "On" state_hs = "On"
elif state == "off": elif state == "off":
@ -585,12 +651,14 @@ class HomeSeer(HTTPDevice):
class HomeAssistant(HTTPDevice): class HomeAssistant(HTTPDevice):
def __init__(self, config): def __init__(self, config: ConfigHelper) -> None:
super().__init__(config, default_port=8123) super().__init__(config, default_port=8123)
self.device = config.get("device") self.device: str = config.get("device")
self.token = config.get("token") self.token: str = config.get("token")
async def _send_homeassistant_command(self, command): async def _send_homeassistant_command(self,
command: str
) -> Dict[Union[str, int], Any]:
if command == "on": if command == "on":
out_cmd = f"api/services/switch/turn_on" out_cmd = f"api/services/switch/turn_on"
body = {"entity_id": self.device} body = {"entity_id": self.device}
@ -617,28 +685,28 @@ class HomeAssistant(HTTPDevice):
else: else:
response = await self.client.fetch( response = await self.client.fetch(
url, method="GET", headers=headers) url, method="GET", headers=headers)
data = json_decode(response.body) data: Dict[Union[str, int], Any] = json_decode(response.body)
except Exception: except Exception:
msg = f"Error sending homeassistant command: {command}" msg = f"Error sending homeassistant command: {command}"
logging.exception(msg) logging.exception(msg)
raise self.server.error(msg) raise self.server.error(msg)
return data return data
async def _send_status_request(self): async def _send_status_request(self) -> str:
res = await self._send_homeassistant_command("info") res = await self._send_homeassistant_command("info")
return res[f"state"] return res[f"state"]
async def _send_power_request(self, state): async def _send_power_request(self, state: str) -> str:
res = await self._send_homeassistant_command(state) res = await self._send_homeassistant_command(state)
return res[0][f"state"] return res[0][f"state"]
class Loxonev1(HTTPDevice): class Loxonev1(HTTPDevice):
def __init__(self, config): def __init__(self, config: ConfigHelper) -> None:
super().__init__(config, default_user="admin", super().__init__(config, default_user="admin",
default_password="admin") default_password="admin")
self.output_id = config.get("output_id", "") self.output_id = config.get("output_id", "")
async def _send_loxonev1_command(self, command): async def _send_loxonev1_command(self, command: str) -> Dict[str, Any]:
if command in ["on", "off"]: if command in ["on", "off"]:
out_cmd = f"jdev/sps/io/{self.output_id}/{command}" out_cmd = f"jdev/sps/io/{self.output_id}/{command}"
elif command == "info": elif command == "info":
@ -652,17 +720,17 @@ class Loxonev1(HTTPDevice):
url = f"http://{out_pwd}{self.addr}/{out_cmd}" url = f"http://{out_pwd}{self.addr}/{out_cmd}"
return await self._send_http_command(url, command) return await self._send_http_command(url, command)
async def _send_status_request(self): async def _send_status_request(self) -> str:
res = await self._send_loxonev1_command("info") res = await self._send_loxonev1_command("info")
state = res[f"LL"][f"value"] state = res[f"LL"][f"value"]
return "on" if int(state) == 1 else "off" return "on" if int(state) == 1 else "off"
async def _send_power_request(self, state): async def _send_power_request(self, state: str) -> str:
res = await self._send_loxonev1_command(state) res = await self._send_loxonev1_command(state)
state = res[f"LL"][f"value"] state = res[f"LL"][f"value"]
return "on" if int(state) == 1 else "off" return "on" if int(state) == 1 else "off"
# The power component has multiple configuration sections # The power component has multiple configuration sections
def load_component_multi(config): def load_component_multi(config: ConfigHelper) -> PrinterPower:
return PrinterPower(config) return PrinterPower(config)