diff --git a/moonraker/components/power.py b/moonraker/components/power.py index bced6f7..a4dd889 100644 --- a/moonraker/components/power.py +++ b/moonraker/components/power.py @@ -4,9 +4,8 @@ # # This file may be distributed under the terms of the GNU GPLv3 license. +from __future__ import annotations import logging -import os -import asyncio import json import struct import socket @@ -17,11 +16,29 @@ from tornado.locks import Lock from tornado.httpclient import AsyncHTTPClient from tornado.escape import json_decode +# Annotation imports +from typing import ( + TYPE_CHECKING, + Type, + List, + Any, + Optional, + Dict, + Coroutine, + Tuple, + Union, +) +if TYPE_CHECKING: + from confighelper import ConfigHelper + from websockets import WebRequest + from . import klippy_apis + APIComp = klippy_apis.KlippyAPI + class PrinterPower: - def __init__(self, config): + def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() self.chip_factory = GpioChipFactory() - self.devices = {} + self.devices: Dict[str, PowerDevice] = {} prefix_sections = config.get_prefix_sections("power") logging.info(f"Power component loading devices: {prefix_sections}") dev_types = { @@ -36,14 +53,14 @@ class PrinterPower: try: for section in prefix_sections: cfg = config[section] - dev_type = cfg.get("type") + dev_type: str = cfg.get("type") + dev_class: Optional[Type[PowerDevice]] dev_class = dev_types.get(dev_type) if dev_class is None: raise config.error(f"Unsupported Device Type: {dev_type}") - elif dev_type == "gpio": - dev = dev_class(cfg, self.chip_factory) - else: - dev = dev_class(cfg) + dev = dev_class(cfg) + if isinstance(dev, GpioDevice): + dev.configure_line(cfg, self.chip_factory) self.devices[dev.get_name()] = dev except Exception: self.chip_factory.close() @@ -72,35 +89,40 @@ class PrinterPower: IOLoop.current().spawn_callback( self._initalize_devices, list(self.devices.values())) - async def _check_klippy_printing(self): - klippy_apis = self.server.lookup_component('klippy_apis') - result = await klippy_apis.query_objects( + async def _check_klippy_printing(self) -> bool: + kapis: APIComp = self.server.lookup_component('klippy_apis') + result: Dict[str, Any] = await kapis.query_objects( {'print_stats': None}, default={}) pstate = result.get('print_stats', {}).get('state', "").lower() return pstate == "printing" - async def _initalize_devices(self, inital_devs): + async def _initalize_devices(self, + inital_devs: List[PowerDevice] + ) -> None: for dev in inital_devs: ret = dev.initialize() - if asyncio.iscoroutine(ret): + if ret is not None: await ret - async def _handle_klippy_shutdown(self): + async def _handle_klippy_shutdown(self) -> None: for name, dev in self.devices.items(): - if hasattr(dev, "off_when_shutdown"): - if dev.off_when_shutdown: - logging.info( - f"Powering off device [{name}] due to" - " klippy shutdown") - await self._process_request(dev, "off") + if dev.has_off_when_shutdown(): + logging.info( + f"Powering off device [{name}] due to" + " klippy shutdown") + await self._process_request(dev, "off") - async def _handle_list_devices(self, web_request): + async def _handle_list_devices(self, + web_request: WebRequest + ) -> Dict[str, Any]: dev_list = [d.get_device_info() for d in self.devices.values()] output = {"devices": dev_list} return output - async def _handle_single_power_request(self, web_request): - dev_name = web_request.get_str('device') + async def _handle_single_power_request(self, + web_request: WebRequest + ) -> Dict[str, Any]: + dev_name: str = web_request.get_str('device') req_action = web_request.get_action() if dev_name not in self.devices: raise self.server.error(f"No valid device named {dev_name}") @@ -115,7 +137,9 @@ class PrinterPower: result = await self._process_request(dev, action) return {dev_name: result} - async def _handle_batch_power_request(self, web_request): + async def _handle_batch_power_request(self, + web_request: WebRequest + ) -> Dict[str, Any]: args = web_request.get_args() ep = web_request.get_endpoint() if not args: @@ -130,15 +154,18 @@ class PrinterPower: result[name] = "device_not_found" return result - async def _process_request(self, device, req): + async def _process_request(self, + device: PowerDevice, + req: str + ) -> str: ret = device.refresh_status() - if asyncio.iscoroutine(ret): + if ret is not None: await ret dev_info = device.get_device_info() if req == "toggle": req = "on" if dev_info['status'] == "off" else "off" if req in ["on", "off"]: - cur_state = dev_info['status'] + cur_state: str = dev_info['status'] if req == cur_state: # device is already in requested state, do nothing return cur_state @@ -148,7 +175,7 @@ class PrinterPower: f"Unable to change power for {device.get_name()} " "while printing") ret = device.set_power(req) - if asyncio.iscoroutine(ret): + if ret is not None: await ret dev_info = device.get_device_info() self.server.send_event("power:power_changed", dev_info) @@ -157,8 +184,8 @@ class PrinterPower: raise self.server.error(f"Unsupported power request: {req}") return dev_info['status'] - def set_device_power(self, device, state): - status = None + def set_device_power(self, device: str, state: str) -> None: + status: Optional[str] = None if isinstance(state, bool): status = "on" if state else "off" elif isinstance(state, str): @@ -175,33 +202,32 @@ class PrinterPower: ioloop.spawn_callback( self._process_request, self.devices[device], status) - async def add_device(self, name, device): + async def add_device(self, name: str, device: PowerDevice) -> None: if name in self.devices: raise self.server.error( f"Device [{name}] already configured") ret = device.initialize() - if asyncio.iscoroutine(ret): + if ret is not None: await ret self.devices[name] = device - async def close(self): + async def close(self) -> None: for device in self.devices.values(): - if hasattr(device, "close"): - ret = device.close() - if asyncio.iscoroutine(ret): - await ret + ret = device.close() + if ret is not None: + await ret self.chip_factory.close() class PowerDevice: - def __init__(self, config): + def __init__(self, config: ConfigHelper) -> None: name_parts = config.get_name().split(maxsplit=1) if len(name_parts) != 2: raise config.error(f"Invalid Section Name: {config.get_name()}") self.server = config.get_server() self.name = name_parts[1] - self.type = config.get('type') - self.state = "init" + self.type: str = config.get('type') + self.state: str = "init" self.locked_while_printing = config.getboolean( 'locked_while_printing', False) self.off_when_shutdown = config.getboolean('off_when_shutdown', False) @@ -213,10 +239,10 @@ class PowerDevice: if self.restart_delay < .000001: raise config.error("Option 'restart_delay' must be above 0.0") - def get_name(self): + def get_name(self) -> str: return self.name - def get_device_info(self): + def get_device_info(self) -> Dict[str, Any]: return { 'device': self.name, 'status': self.state, @@ -224,31 +250,54 @@ class PowerDevice: 'type': self.type } - def get_locked_while_printing(self): + def get_locked_while_printing(self) -> bool: return self.locked_while_printing - def run_power_changed_action(self): + def run_power_changed_action(self) -> None: if self.state == "on" and self.klipper_restart: ioloop = IOLoop.current() - klippy_apis = self.server.lookup_component("klippy_apis") - ioloop.call_later(self.restart_delay, klippy_apis.do_restart, - "FIRMWARE_RESTART") + kapis: APIComp = self.server.lookup_component("klippy_apis") + ioloop.call_later( + self.restart_delay, kapis.do_restart, # type:ignore + "FIRMWARE_RESTART") + + def has_off_when_shutdown(self) -> bool: + return self.off_when_shutdown + + def initialize(self) -> Optional[Coroutine]: + raise NotImplementedError + + def refresh_status(self) -> Optional[Coroutine]: + raise NotImplementedError + + def set_power(self, state: str) -> Optional[Coroutine]: + raise NotImplementedError + + def close(self) -> Optional[Coroutine]: + pass class HTTPDevice(PowerDevice): - def __init__(self, config, default_port=None, - default_user=None, default_password=None): + def __init__(self, + config: ConfigHelper, + default_port: int = -1, + default_user: str = "", + default_password: str = "" + ) -> None: super().__init__(config) self.client = AsyncHTTPClient() self.request_mutex = Lock() - self.addr = config.get("address") + self.addr: str = config.get("address") self.port = config.getint("port", default_port) self.user = config.get("user", default_user) self.password = config.get("password", default_password) - async def initialize(self): + async def initialize(self) -> None: await self.refresh_status() - async def _send_http_command(self, url, command): + async def _send_http_command(self, + url: str, + command: str + ) -> Dict[str, Any]: try: response = await self.client.fetch(url) data = json_decode(response.body) @@ -258,15 +307,15 @@ class HTTPDevice(PowerDevice): raise self.server.error(msg) return data - async def _send_power_request(self, state): + async def _send_power_request(self, state: str) -> str: raise NotImplementedError( "_send_power_request must be implemented by children") - async def _send_status_request(self): + async def _send_status_request(self) -> str: raise NotImplementedError( "_send_status_request must be implemented by children") - async def refresh_status(self): + async def refresh_status(self) -> None: async with self.request_mutex: try: state = await self._send_status_request() @@ -290,23 +339,29 @@ class HTTPDevice(PowerDevice): class GpioChipFactory: - def __init__(self): - self.chips = {} + def __init__(self) -> None: + self.chips: Dict[str, gpiod.Chip] = {} - def get_gpio_chip(self, chip_name): + def get_gpio_chip(self, chip_name) -> gpiod.Chip: if chip_name in self.chips: return self.chips[chip_name] chip = gpiod.Chip(chip_name, gpiod.Chip.OPEN_BY_NAME) self.chips[chip_name] = chip return chip - def close(self): + def close(self) -> None: for chip in self.chips.values(): chip.close() class GpioDevice(PowerDevice): - def __init__(self, config, chip_factory): + def __init__(self, config: ConfigHelper): super().__init__(config) + self.initial_state = config.getboolean('initial_state', False) + + def configure_line(self, + config: ConfigHelper, + chip_factory: GpioChipFactory + ) -> None: pin, chip_id, invert = self._parse_pin(config) try: chip = chip_factory.get_gpio_chip(chip_id) @@ -324,15 +379,15 @@ class GpioDevice(PowerDevice): f"Unable to init {pin}. Make sure the gpio is not in " "use by another program or exported by sysfs.") raise config.error("Power GPIO Config Error") - self.initial_state = config.getboolean('initial_state', False) - def _parse_pin(self, config): + + def _parse_pin(self, config: ConfigHelper) -> Tuple[int, str, bool]: pin = cfg_pin = config.get("pin") invert = False if pin[0] == "!": pin = pin[1:] invert = True - chip_id = "gpiochip0" + chip_id: str = "gpiochip0" pin_parts = pin.split("/") if len(pin_parts) == 2: chip_id, pin = pin_parts @@ -345,16 +400,16 @@ class GpioDevice(PowerDevice): not pin[4:].isdigit(): raise config.error( f"Invalid Power Pin configuration: {cfg_pin}") - pin = int(pin[4:]) - return pin, chip_id, invert + pin_id = int(pin[4:]) + return pin_id, chip_id, invert - def initialize(self): + def initialize(self) -> None: self.set_power("on" if self.initial_state else "off") - def refresh_status(self): + def refresh_status(self) -> None: pass - def set_power(self, state): + def set_power(self, state) -> None: try: self.line.set_value(int(state == "on")) except Exception: @@ -364,7 +419,7 @@ class GpioDevice(PowerDevice): raise self.server.error(msg) from None self.state = state - def close(self): + def close(self) -> None: self.line.release() @@ -376,14 +431,16 @@ class GpioDevice(PowerDevice): # Copyright 2016 softScheck GmbH class TPLinkSmartPlug(PowerDevice): START_KEY = 0xAB - def __init__(self, config): + def __init__(self, config: ConfigHelper) -> None: super().__init__(config) self.request_mutex = Lock() - self.addr = config.get("address").split('/') + self.addr: List[str] = config.get("address").split('/') self.port = config.getint("port", 9999) - async def _send_tplink_command(self, command): - out_cmd = {} + async def _send_tplink_command(self, + command: str + ) -> Dict[str, Any]: + out_cmd: Dict[str, Any] = {} if command in ["on", "off"]: out_cmd = { 'system': {'set_relay_state': {'state': int(command == "on")}} @@ -405,7 +462,7 @@ class TPLinkSmartPlug(PowerDevice): await stream.connect((self.addr[0], self.port)) await stream.write(self._encrypt(out_cmd)) data = await stream.read_bytes(2048, partial=True) - length = struct.unpack(">I", data[:4])[0] + length: int = struct.unpack(">I", data[:4])[0] data = data[4:] retries = 5 remaining = length - len(data) @@ -423,8 +480,8 @@ class TPLinkSmartPlug(PowerDevice): stream.close() return json.loads(self._decrypt(data)) - def _encrypt(self, data): - data = json.dumps(data) + def _encrypt(self, outdata: Dict[str, Any]) -> bytes: + data = json.dumps(outdata) key = self.START_KEY res = struct.pack(">I", len(data)) for c in data: @@ -433,24 +490,26 @@ class TPLinkSmartPlug(PowerDevice): res += bytes([val]) return res - def _decrypt(self, data): - key = self.START_KEY - res = "" + def _decrypt(self, data: bytes) -> str: + key: int = self.START_KEY + res: str = "" for c in data: val = key ^ c key = c res += chr(val) return res - async def initialize(self): + async def initialize(self) -> None: await self.refresh_status() - async def refresh_status(self): + async def refresh_status(self) -> None: async with self.request_mutex: try: + state: str res = await self._send_tplink_command("info") if len(self.addr) == 2: # TPLink device controls multiple devices + children: Dict[int, Any] children = res['system']['get_sysinfo']['children'] state = children[int(self.addr[1])]['state'] else: @@ -462,8 +521,9 @@ class TPLinkSmartPlug(PowerDevice): raise self.server.error(msg) from None self.state = "on" if state else "off" - async def set_power(self, state): + async def set_power(self, state) -> None: async with self.request_mutex: + err: int try: res = await self._send_tplink_command(state) err = res['system']['set_relay_state']['err_code'] @@ -478,12 +538,15 @@ class TPLinkSmartPlug(PowerDevice): class Tasmota(HTTPDevice): - def __init__(self, config): + def __init__(self, config: ConfigHelper) -> None: super().__init__(config, default_password="") self.output_id = config.getint("output_id", 1) self.timer = config.get("timer", "") - async def _send_tasmota_command(self, command, password=None): + async def _send_tasmota_command(self, + command: str, + password: Optional[str] = None + ) -> Dict[str, Any]: if command in ["on", "off"]: out_cmd = f"Power{self.output_id}%20{command}" if self.timer != "" and command == "off": @@ -497,10 +560,10 @@ class Tasmota(HTTPDevice): f"{self.password}&cmnd={out_cmd}" return await self._send_http_command(url, command) - async def _send_status_request(self): + async def _send_status_request(self) -> str: res = await self._send_tasmota_command("info") try: - state = res[f"POWER{self.output_id}"].lower() + state: str = res[f"POWER{self.output_id}"].lower() except KeyError as e: if self.output_id == 1: state = res[f"POWER"].lower() @@ -508,7 +571,7 @@ class Tasmota(HTTPDevice): raise KeyError(e) return state - async def _send_power_request(self, state): + async def _send_power_request(self, state: str) -> str: res = await self._send_tasmota_command(state) if self.timer == "" or state != "off": try: @@ -522,12 +585,12 @@ class Tasmota(HTTPDevice): class Shelly(HTTPDevice): - def __init__(self, config): + def __init__(self, config: ConfigHelper) -> None: super().__init__(config, default_user="admin", default_password="") self.output_id = config.getint("output_id", 0) self.timer = config.get("timer", "") - async def _send_shelly_command(self, command): + async def _send_shelly_command(self, command: str) -> Dict[str, Any]: if command == "on": out_cmd = f"relay/{self.output_id}?turn={command}" elif command == "off": @@ -546,13 +609,13 @@ class Shelly(HTTPDevice): url = f"http://{out_pwd}{self.addr}/{out_cmd}" return await self._send_http_command(url, command) - async def _send_status_request(self): + async def _send_status_request(self) -> str: res = await self._send_shelly_command("info") - state = res[f"ison"] + state: str = res[f"ison"] timer_remaining = res[f"timer_remaining"] if self.timer != "" else 0 return "on" if state and timer_remaining == 0 else "off" - async def _send_power_request(self, state): + async def _send_power_request(self, state: str) -> str: res = await self._send_shelly_command(state) state = res[f"ison"] timer_remaining = res[f"timer_remaining"] if self.timer != "" else 0 @@ -560,21 +623,24 @@ class Shelly(HTTPDevice): class HomeSeer(HTTPDevice): - def __init__(self, config): + def __init__(self, config: ConfigHelper) -> None: super().__init__(config, default_user="admin", default_password="") self.device = config.getint("device") - async def _send_homeseer(self, request, additional=""): + async def _send_homeseer(self, + request: str, + additional: str = "" + ) -> Dict[str, Any]: url = (f"http://{self.user}:{self.password}@{self.addr}" f"/JSON?user={self.user}&pass={self.password}" f"&request={request}&ref={self.device}&{additional}") return await self._send_http_command(url, request) - async def _send_status_request(self): + async def _send_status_request(self) -> str: res = await self._send_homeseer("getstatus") return res[f"Devices"][0]["status"].lower() - async def _send_power_request(self, state): + async def _send_power_request(self, state: str) -> str: if state == "on": state_hs = "On" elif state == "off": @@ -585,12 +651,14 @@ class HomeSeer(HTTPDevice): class HomeAssistant(HTTPDevice): - def __init__(self, config): + def __init__(self, config: ConfigHelper) -> None: super().__init__(config, default_port=8123) - self.device = config.get("device") - self.token = config.get("token") + self.device: str = config.get("device") + self.token: str = config.get("token") - async def _send_homeassistant_command(self, command): + async def _send_homeassistant_command(self, + command: str + ) -> Dict[Union[str, int], Any]: if command == "on": out_cmd = f"api/services/switch/turn_on" body = {"entity_id": self.device} @@ -617,28 +685,28 @@ class HomeAssistant(HTTPDevice): else: response = await self.client.fetch( url, method="GET", headers=headers) - data = json_decode(response.body) + data: Dict[Union[str, int], Any] = json_decode(response.body) except Exception: msg = f"Error sending homeassistant command: {command}" logging.exception(msg) raise self.server.error(msg) return data - async def _send_status_request(self): + async def _send_status_request(self) -> str: res = await self._send_homeassistant_command("info") return res[f"state"] - async def _send_power_request(self, state): + async def _send_power_request(self, state: str) -> str: res = await self._send_homeassistant_command(state) return res[0][f"state"] class Loxonev1(HTTPDevice): - def __init__(self, config): + def __init__(self, config: ConfigHelper) -> None: super().__init__(config, default_user="admin", default_password="admin") self.output_id = config.get("output_id", "") - async def _send_loxonev1_command(self, command): + async def _send_loxonev1_command(self, command: str) -> Dict[str, Any]: if command in ["on", "off"]: out_cmd = f"jdev/sps/io/{self.output_id}/{command}" elif command == "info": @@ -652,17 +720,17 @@ class Loxonev1(HTTPDevice): url = f"http://{out_pwd}{self.addr}/{out_cmd}" return await self._send_http_command(url, command) - async def _send_status_request(self): + async def _send_status_request(self) -> str: res = await self._send_loxonev1_command("info") state = res[f"LL"][f"value"] return "on" if int(state) == 1 else "off" - async def _send_power_request(self, state): + async def _send_power_request(self, state: str) -> str: res = await self._send_loxonev1_command(state) state = res[f"LL"][f"value"] return "on" if int(state) == 1 else "off" # The power component has multiple configuration sections -def load_component_multi(config): +def load_component_multi(config: ConfigHelper) -> PrinterPower: return PrinterPower(config)