power: add support for uhubctl devices

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-06-09 16:43:35 -04:00
parent 326d23a509
commit ddc0e76ffc
1 changed files with 108 additions and 1 deletions

View File

@ -1,5 +1,6 @@
# Raspberry Pi Power Control
# Power Switch Control
#
# Copyright (C) 2024 Eric Callahan <arksine.code@gmail.com>
# Copyright (C) 2020 Jordan Ruthe <jordanruthe@gmail.com>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
@ -10,6 +11,8 @@ import struct
import socket
import asyncio
import time
import re
import shutil
from urllib.parse import quote, urlencode
from ..utils import json_wrapper as jsonw
from ..common import RequestType, KlippyState
@ -35,6 +38,7 @@ if TYPE_CHECKING:
from .mqtt import MQTTClient
from .http_client import HttpClient
from .klippy_connection import KlippyConnection
from .shell_command import ShellCommandFactory as ShellCommand
class PrinterPower:
def __init__(self, config: ConfigHelper) -> None:
@ -56,6 +60,7 @@ class PrinterPower:
"smartthings": SmartThings,
"hue": HueDevice,
"http": GenericHTTP,
"uhubctl": UHubCtl
}
for section in prefix_sections:
@ -1465,6 +1470,108 @@ class GenericHTTP(HTTPDevice):
async def _send_status_request(self) -> str:
return await self._send_generic_request("status")
HUB_STATE_PATTERN = r"""
(?:Port\s(?P<port>[0-9]+):)
(?:\s(?P<bits>[0-9a-f]{4}))
(?:\s(?P<pstate>power|off))
(?P<flags>(?:\s[0-9a-z]+)+)?
(?:\s\[(?P<desc>.+)\])?
"""
class UHubCtl(PowerDevice):
_uhubctrl_regex = re.compile(
r"^\s*" + HUB_STATE_PATTERN + r"\s*$",
re.VERBOSE | re.IGNORECASE
)
def __init__(self, config: ConfigHelper) -> None:
super().__init__(config)
self.scmd: ShellCommand = self.server.load_component(config, "shell_command")
self.location = config.get("location")
self.port = config.getint("port")
ret = shutil.which("uhubctl")
if ret is None:
raise config.error(
f"[{config.get_name()}]: failed to locate 'uhubctl' binary. "
"Make sure uhubctl is correctly installed on the host machine."
)
async def init_state(self) -> None:
async with self.request_lock:
await self.refresh_status()
cur_state = True if self.state == "on" else False
if self.initial_state is not None and cur_state != self.initial_state:
await self.set_power("on" if self.initial_state else "off")
async def refresh_status(self) -> None:
try:
result = await self._run_uhubctl("info")
except self.server.error as e:
self.state = "error"
output = f"\n{e}"
if isinstance(e, self.scmd.error):
output += f"\nuhubctrl output: {e.stderr.decode(errors='ignore')}"
logging.info(f"Power Device {self.name}: Refresh Error{output}")
return
logging.debug(f"Power Device {self.name}: uhubctl device info: {result}")
self.state = result["state"]
async def set_power(self, state: str) -> None:
try:
result = await self._run_uhubctl(state)
except self.server.error as e:
self.state = "error"
msg = f"Power Device {self.name}: Error turning device {state}"
output = f"\n{e}"
if isinstance(e, self.scmd.error):
output += f"\nuhubctrl output: {e.stderr.decode(errors='ignore')}"
logging.info(f"{msg}{output}")
raise self.server.error(msg) from None
logging.debug(f"Power Device {self.name}: uhubctl device info: {result}")
self.state = result["state"]
async def _run_uhubctl(self, action: str) -> Dict[str, Any]:
cmd = f"uhubctl -l {self.location} -p {self.port}"
search_prefix = "Current status"
if action in ["on", "off"]:
cmd += f" -a {action}"
search_prefix = "New status"
resp: str = await self.scmd.exec_cmd(cmd, log_complete=False)
for line in resp.splitlines():
if search_prefix:
if line.startswith(search_prefix):
search_prefix = ""
continue
match = self._uhubctrl_regex.match(line.strip())
if match is None:
continue
result = match.groupdict()
try:
port = int(result["port"])
status_bits = int(result["bits"], 16)
except (TypeError, ValueError):
continue
if port != self.port:
continue
if result["pstate"] is None:
continue
state = "on" if result["pstate"] == "power" else "off"
flags: List[str] = []
if result["flags"] is not None:
flags = result["flags"].strip().split()
return {
"port": port,
"status_bits": status_bits,
"state": state,
"flags": flags,
"desc": result["desc"]
}
raise self.server.error(
f"Failed to receive response for device at location {self.location}, "
f"port {self.port}, "
)
# The power component has multiple configuration sections
def load_component(config: ConfigHelper) -> PrinterPower:
return PrinterPower(config)