diff --git a/moonraker/components/power.py b/moonraker/components/power.py index 9fe3d10..2edf9d1 100644 --- a/moonraker/components/power.py +++ b/moonraker/components/power.py @@ -5,8 +5,6 @@ # This file may be distributed under the terms of the GNU GPLv3 license. from __future__ import annotations -import sys -import glob import logging import json import struct @@ -26,43 +24,21 @@ from typing import ( Optional, Dict, Coroutine, - Tuple, Union, ) -# Special handling for gpiod import -HAS_GPIOD = False -PKG_PATHS = glob.glob("/usr/lib/python3*/dist-packages") -PKG_PATHS += glob.glob("/usr/lib/python3*/site-packages") -for pkg_path in PKG_PATHS: - sys.path.insert(0, pkg_path) - try: - import gpiod - except ModuleNotFoundError: - sys.path.pop(0) - except ImportError: - logging.exception("Unable to load gpiod module") - sys.path.pop(0) - else: - HAS_GPIOD = True - sys.path.pop(0) - break - if TYPE_CHECKING: from confighelper import ConfigHelper from websockets import WebRequest from .machine import Machine + from .gpio import GpioFactory from . import klippy_apis APIComp = klippy_apis.KlippyAPI class PrinterPower: def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() - if not HAS_GPIOD: - self.server.add_warning( - "Unable to load gpiod library, GPIO power " - "devices will not be loaded") - self.chip_factory = GpioChipFactory() + has_gpio = self.server.load_component(config, 'gpio', None) is not None self.devices: Dict[str, PowerDevice] = {} prefix_sections = config.get_prefix_sections("power") logging.info(f"Power component loading devices: {prefix_sections}") @@ -76,26 +52,21 @@ class PrinterPower: "loxonev1": Loxonev1, "rf": RFDevice } - try: - for section in prefix_sections: - cfg = config[section] - dev_type: str = cfg.get("type") - dev_class: Optional[Type[PowerDevice]] - dev_class = dev_types.get(dev_type) - if dev_class is None: - raise config.error(f"Unsupported Device Type: {dev_type}") - dev = dev_class(cfg) - if isinstance(dev, GpioDevice) or isinstance(dev, RFDevice): - if not HAS_GPIOD: - self.server.add_warning( - f"Unable to load power device [{cfg.get_name()}], " - "gpiod module not loaded") - continue - dev.configure_line(cfg, self.chip_factory) - self.devices[dev.get_name()] = dev - except Exception: - self.chip_factory.close() - raise + + for section in prefix_sections: + cfg = config[section] + dev_type: str = cfg.get("type") + dev_class: Optional[Type[PowerDevice]] + dev_class = dev_types.get(dev_type) + if dev_class is None: + raise config.error(f"Unsupported Device Type: {dev_type}") + if issubclass(dev_class, GpioDevice) and not has_gpio: + self.server.add_warning( + f"Unable to load power device [{cfg.get_name()}], " + "gpio component not available") + continue + dev = dev_class(cfg) + self.devices[dev.get_name()] = dev self.server.register_endpoint( "/machine/device_power/devices", ['GET'], @@ -278,7 +249,6 @@ class PrinterPower: ret = device.close() if ret is not None: await ret - self.chip_factory.close() class PowerDevice: @@ -459,28 +429,11 @@ class HTTPDevice(PowerDevice): self.state = state -class GpioChipFactory: - def __init__(self) -> None: - self.chips: Dict[str, gpiod.Chip] = {} - version: str = gpiod.version_string() - self.gpiod_version = tuple(int(v) for v in version.split('.')) - - def get_gpio_chip(self, chip_name) -> gpiod.Chip: - if chip_name in self.chips: - return self.chips[chip_name] - chip = gpiod.Chip(chip_name, gpiod.Chip.OPEN_BY_NAME) - self.chips[chip_name] = chip - return chip - - def get_gpiod_version(self): - return self.gpiod_version - - def close(self) -> None: - for chip in self.chips.values(): - chip.close() - class GpioDevice(PowerDevice): - def __init__(self, config: ConfigHelper): + def __init__(self, + config: ConfigHelper, + initial_val: Optional[int] = None + ) -> None: super().__init__(config) self.initial_state = config.getboolean('initial_state', False) self.timer: Optional[float] = config.getfloat('timer', None) @@ -489,54 +442,11 @@ class GpioDevice(PowerDevice): f"Option 'timer' in section [{config.get_name()}] must " "be above 0.0") self.timer_handle: Optional[asyncio.TimerHandle] = None - - def configure_line(self, - config: ConfigHelper, - chip_factory: GpioChipFactory - ) -> None: - pin, chip_id, invert = self._parse_pin(config) - try: - chip = chip_factory.get_gpio_chip(chip_id) - self.line = chip.get_line(pin) - args: Dict[str, Any] = { - 'consumer': "moonraker", - 'type': gpiod.LINE_REQ_DIR_OUT - } - if invert: - args['flags'] = gpiod.LINE_REQ_FLAG_ACTIVE_LOW - if chip_factory.get_gpiod_version() < (1, 3): - args['default_vals'] = [int(self.initial_state)] - else: - args['default_val'] = int(self.initial_state) - self.line.request(**args) - except Exception: - self.state = "error" - logging.exception( - f"Unable to init {pin}. Make sure the gpio is not in " - "use by another program or exported by sysfs.") - raise config.error("Power GPIO Config Error") - - def _parse_pin(self, config: ConfigHelper) -> Tuple[int, str, bool]: - pin = cfg_pin = config.get("pin") - invert = False - if pin[0] == "!": - pin = pin[1:] - invert = True - chip_id: str = "gpiochip0" - pin_parts = pin.split("/") - if len(pin_parts) == 2: - chip_id, pin = pin_parts - elif len(pin_parts) == 1: - pin = pin_parts[0] - # Verify pin - if not chip_id.startswith("gpiochip") or \ - not chip_id[-1].isdigit() or \ - not pin.startswith("gpio") or \ - not pin[4:].isdigit(): - raise config.error( - f"Invalid Power Pin configuration: {cfg_pin}") - pin_id = int(pin[4:]) - return pin_id, chip_id, invert + gpio: GpioFactory = self.server.lookup_component('gpio') + if initial_val is None: + initial_val = int(self.initial_state) + self.gpio_out = gpio.get_gpio_out_from_config( + config, initial_value=initial_val) def initialize(self) -> None: super().initialize() @@ -550,7 +460,7 @@ class GpioDevice(PowerDevice): self.timer_handle.cancel() self.timer_handle = None try: - self.line.set_value(int(state == "on")) + self.gpio_out.write(int(state == "on")) except Exception: self.state = "error" msg = f"Error Toggling Device Power: {self.name}" @@ -564,7 +474,6 @@ class GpioDevice(PowerDevice): self.timer, power.set_device_power, self.name, "off") def close(self) -> None: - self.line.release() if self.timer_handle is not None: self.timer_handle.cancel() self.timer_handle = None @@ -580,14 +489,14 @@ class RFDevice(GpioDevice): RETRIES = 10 # send the code this many times def __init__(self, config: ConfigHelper): - super().__init__(config) + super().__init__(config, initial_val=0) self.on = config.get("on_code").zfill(24) self.off = config.get("off_code").zfill(24) def _transmit_digit(self, waveform) -> None: - self.line.set_value(1) + self.gpio_out.write(1) time.sleep(waveform[0]*RFDevice.PULSE_LEN) - self.line.set_value(0) + self.gpio_out.write(0) time.sleep(waveform[1]*RFDevice.PULSE_LEN) def _transmit_code(self, code) -> None: