diff --git a/moonraker/components/gpio.py b/moonraker/components/gpio.py index e7bdaed..02cb7f7 100644 --- a/moonraker/components/gpio.py +++ b/moonraker/components/gpio.py @@ -4,8 +4,11 @@ # # This file may be distributed under the terms of the GNU GPLv3 license. from __future__ import annotations +import os +import platform +import pathlib import logging -from ..utils import load_system_module +import periphery # Annotation imports from typing import ( @@ -22,125 +25,97 @@ if TYPE_CHECKING: from ..eventloop import EventLoop GPIO_CALLBACK = Callable[[float, float, int], Optional[Awaitable[None]]] +try: + KERNEL_VERSION = tuple([int(part) for part in platform.release().split(".")[:2]]) +except Exception: + KERNEL_VERSION = (0, 0) +GPIO_SUPPORTS_BIAS = KERNEL_VERSION >= (5, 5) + class GpioFactory: def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() - self.gpiod: Any = load_system_module("gpiod") - GpioEvent.init_constants(self.gpiod) - self.chips: Dict[str, Any] = {} self.reserved_gpios: Dict[str, GpioBase] = {} - version: str = self.gpiod.version_string() - self.gpiod_version = tuple(int(v) for v in version.split('.')) - self.server.add_log_rollover_item( - "gpiod_version", f"libgpiod version: {version}") - def _get_gpio_chip(self, chip_name) -> Any: - if chip_name in self.chips: - return self.chips[chip_name] - chip = self.gpiod.Chip(chip_name, self.gpiod.Chip.OPEN_BY_NAME) - self.chips[chip_name] = chip - return chip - - def setup_gpio_out(self, - pin_name: str, - initial_value: int = 0 - ) -> GpioOutputPin: + def setup_gpio_out(self, pin_name: str, initial_value: int = 0) -> GpioOutputPin: initial_value = int(not not initial_value) - pparams = self._parse_pin(pin_name) - pparams['initial_value'] = initial_value - line = self._request_gpio(pparams) + pparams = self._parse_pin(pin_name, initial_value) + gpio = self._request_gpio(pparams) try: - gpio_out = GpioOutputPin(line, pparams) + gpio_out = GpioOutputPin(gpio, pparams) except Exception: logging.exception("Error Instantiating GpioOutputPin") - line.release() + gpio.close() raise - full_name = pparams['full_name'] + full_name = pparams["full_name"] self.reserved_gpios[full_name] = gpio_out return gpio_out - def register_gpio_event(self, - pin_name: str, - callback: GPIO_CALLBACK - ) -> GpioEvent: - pin_params = self._parse_pin(pin_name, type="event") - line = self._request_gpio(pin_params) + def register_gpio_event( + self, pin_name: str, callback: GPIO_CALLBACK + ) -> GpioEvent: + pin_params = self._parse_pin(pin_name, req_type="event") + gpio = self._request_gpio(pin_params) event_loop = self.server.get_event_loop() try: - gpio_event = GpioEvent(event_loop, line, pin_params, callback) + gpio_event = GpioEvent(event_loop, gpio, pin_params, callback) except Exception: logging.exception("Error Instantiating GpioEvent") - line.release() + gpio.close() raise - full_name = pin_params['full_name'] + full_name = pin_params["full_name"] self.reserved_gpios[full_name] = gpio_event return gpio_event - def _request_gpio(self, pin_params: Dict[str, Any]) -> Any: - full_name = pin_params['full_name'] + def _request_gpio(self, pin_params: Dict[str, Any]) -> periphery.GPIO: + full_name = pin_params["full_name"] if full_name in self.reserved_gpios: raise self.server.error(f"GPIO {full_name} already reserved") + chip_path = pathlib.Path("/dev").joinpath(pin_params["chip_id"]) + if not chip_path.exists(): + raise self.server.error(f"Chip path {chip_path} does not exist") try: - chip = self._get_gpio_chip(pin_params['chip_id']) - line = chip.get_line(pin_params['pin_id']) - args: Dict[str, Any] = { - 'consumer': "moonraker", - 'type': pin_params['request_type'] - } - if 'flags' in pin_params: - args['flags'] = pin_params['flags'] - if 'initial_value' in pin_params: - if self.gpiod_version < (1, 3): - args['default_vals'] = [pin_params['initial_value']] - else: - args['default_val'] = pin_params['initial_value'] - line.request(**args) + gpio = periphery.GPIO( + str(chip_path), + pin_params["pin_id"], + pin_params["direction"], + edge=pin_params.get("edge", "none"), + bias=pin_params.get("bias", "default"), + inverted=pin_params["inverted"], + label="moonraker" + ) except Exception: logging.exception( f"Unable to init {full_name}. Make sure the gpio is not in " "use by another program or exported by sysfs.") raise - return line + return gpio - def _parse_pin(self, - pin_name: str, - type: str = "out" - ) -> Dict[str, Any]: + def _parse_pin( + self, pin_name: str, initial_value: int = 0, req_type: str = "out" + ) -> Dict[str, Any]: params: Dict[str, Any] = { - 'orig': pin_name, - 'invert': False, + "orig": pin_name, + "inverted": False, + "request_type": req_type, + "initial_value": initial_value } pin = pin_name - if type == "event": - params['request_type'] = self.gpiod.LINE_REQ_EV_BOTH_EDGES - flag: str = "disable" + if req_type == "event": + params["direction"] = "in" + params["edge"] = "both" + bias: str = "disable" if GPIO_SUPPORTS_BIAS else "default" if pin[0] == "^": pin = pin[1:] - flag = "pullup" + bias = "pull_up" elif pin[0] == "~": pin = pin[1:] - flag = "pulldown" - if self.gpiod_version >= (1, 5): - flag_to_enum = { - "disable": self.gpiod.LINE_REQ_FLAG_BIAS_DISABLE, - "pullup": self.gpiod.LINE_REQ_FLAG_BIAS_PULL_UP, - "pulldown": self.gpiod.LINE_REQ_FLAG_BIAS_PULL_DOWN - } - params['flags'] = flag_to_enum[flag] - elif flag != "disable": - raise self.server.error( - f"Flag {flag} configured for event GPIO '{pin_name}'" - " requires libgpiod version 1.5 or later. " - f"Current Version: {self.gpiod.version_string()}") - elif type == "out": - params['request_type'] = self.gpiod.LINE_REQ_DIR_OUT + bias = "pull_down" + params["bias"] = bias + elif req_type == "out": + params["direction"] = "out" if not initial_value else "high" if pin[0] == "!": pin = pin[1:] - params['invert'] = True - if 'flags' in params: - params['flags'] |= self.gpiod.LINE_REQ_FLAG_ACTIVE_LOW - else: - params['flags'] = self.gpiod.LINE_REQ_FLAG_ACTIVE_LOW + params["inverted"] = True chip_id: str = "gpiochip0" pin_parts = pin.split("/") if len(pin_parts) == 2: @@ -148,37 +123,36 @@ class GpioFactory: elif len(pin_parts) == 1: pin = pin_parts[0] # Verify pin - if not chip_id.startswith("gpiochip") or \ - not chip_id[-1].isdigit() or \ - not pin.startswith("gpio") or \ - not pin[4:].isdigit(): + if ( + not chip_id.startswith("gpiochip") or + not chip_id[-1].isdigit() or + not pin.startswith("gpio") or + not pin[4:].isdigit() + ): raise self.server.error( f"Invalid Gpio Pin: {pin_name}") pin_id = int(pin[4:]) - params['pin_id'] = pin_id - params['chip_id'] = chip_id - params['full_name'] = f"{chip_id}:{pin}" + params["pin_id"] = pin_id + params["chip_id"] = chip_id + params["full_name"] = f"{chip_id}:{pin}" return params def close(self) -> None: - for line in self.reserved_gpios.values(): - line.release() - for chip in self.chips.values(): - chip.close() + for gpio in self.reserved_gpios.values(): + gpio.close() class GpioBase: - def __init__(self, - line: Any, - pin_params: Dict[str, Any] - ) -> None: - self.orig: str = pin_params['orig'] - self.name: str = pin_params['full_name'] - self.inverted: bool = pin_params['invert'] - self.line: Any = line - self.value: int = pin_params.get('initial_value', 0) + def __init__( + self, gpio: periphery.GPIO, pin_params: Dict[str, Any] + ) -> None: + self.orig: str = pin_params["orig"] + self.name: str = pin_params["full_name"] + self.inverted: bool = pin_params["inverted"] + self.gpio = gpio + self.value: int = pin_params.get("initial_value", 0) - def release(self) -> None: - self.line.release() + def close(self) -> None: + self.gpio.close() def is_inverted(self) -> bool: return self.inverted @@ -195,24 +169,22 @@ class GpioBase: class GpioOutputPin(GpioBase): def write(self, value: int) -> None: self.value = int(not not value) - self.line.set_value(self.value) + self.gpio.write(bool(self.value)) MAX_ERRORS = 50 ERROR_RESET_TIME = 5. class GpioEvent(GpioBase): - EVENT_FALLING_EDGE = 0 - EVENT_RISING_EDGE = 1 - def __init__(self, - event_loop: EventLoop, - line: Any, - pin_params: Dict[str, Any], - callback: GPIO_CALLBACK - ) -> None: - super().__init__(line, pin_params) + def __init__( + self, + event_loop: EventLoop, + gpio: periphery.GPIO, + pin_params: Dict[str, Any], + callback: GPIO_CALLBACK + ) -> None: + super().__init__(gpio, pin_params) self.event_loop = event_loop - self.fd = line.event_get_fd() self.callback = callback self.on_error: Optional[Callable[[str], None]] = None self.min_evt_time = 0. @@ -220,11 +192,10 @@ class GpioEvent(GpioBase): self.error_count = 0 self.last_error_reset = 0. self.started = False + os.set_blocking(self.gpio.fd, False) - @classmethod - def init_constants(cls, gpiod: Any) -> None: - cls.EVENT_RISING_EDGE = gpiod.LineEvent.RISING_EDGE - cls.EVENT_FALLING_EDGE = gpiod.LineEvent.FALLING_EDGE + def fileno(self) -> int: + return self.gpio.fd def setup_debounce(self, min_evt_time: float, @@ -235,29 +206,31 @@ class GpioEvent(GpioBase): def start(self) -> None: if not self.started: - self.value = self.line.get_value() + self.value = int(self.gpio.read()) self.last_event_time = self.event_loop.get_loop_time() - self.event_loop.add_reader(self.fd, self._on_event_trigger) + self.event_loop.add_reader(self.gpio.fd, self._on_event_trigger) self.started = True logging.debug(f"GPIO {self.name}: Listening for events, " f"current state: {self.value}") def stop(self) -> None: if self.started: - self.event_loop.remove_reader(self.fd) + self.event_loop.remove_reader(self.gpio.fd) self.started = False - def release(self) -> None: + def close(self) -> None: self.stop() - self.line.release() + self.gpio.close() def _on_event_trigger(self) -> None: - evt = self.line.event_read() + evt = self.gpio.read_event() last_val = self.value - if evt.type == self.EVENT_RISING_EDGE: + if evt.edge == "rising": # type: ignore self.value = 1 - elif evt.type == self.EVENT_FALLING_EDGE: + elif evt.edge == "falling": # type: ignore self.value = 0 + else: + return eventtime = self.event_loop.get_loop_time() evt_duration = eventtime - self.last_event_time if last_val == self.value or evt_duration < self.min_evt_time: @@ -277,8 +250,9 @@ class GpioEvent(GpioBase): if self.error_count >= MAX_ERRORS: self.stop() if self.on_error is not None: - self.on_error("Too Many Consecutive Errors, " - f"GPIO Event Disabled on {self.name}") + self.on_error( + f"Too Many Consecutive Errors, GPIO Event Disabled on {self.name}" + ) def load_component(config: ConfigHelper) -> GpioFactory: