power: refactor gpio device to use gpio component

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2021-11-15 07:04:15 -05:00
parent 83901b1896
commit 18c6ff5a99
1 changed files with 30 additions and 121 deletions

View File

@ -5,8 +5,6 @@
# This file may be distributed under the terms of the GNU GPLv3 license. # This file may be distributed under the terms of the GNU GPLv3 license.
from __future__ import annotations from __future__ import annotations
import sys
import glob
import logging import logging
import json import json
import struct import struct
@ -26,43 +24,21 @@ from typing import (
Optional, Optional,
Dict, Dict,
Coroutine, Coroutine,
Tuple,
Union, Union,
) )
# Special handling for gpiod import
HAS_GPIOD = False
PKG_PATHS = glob.glob("/usr/lib/python3*/dist-packages")
PKG_PATHS += glob.glob("/usr/lib/python3*/site-packages")
for pkg_path in PKG_PATHS:
sys.path.insert(0, pkg_path)
try:
import gpiod
except ModuleNotFoundError:
sys.path.pop(0)
except ImportError:
logging.exception("Unable to load gpiod module")
sys.path.pop(0)
else:
HAS_GPIOD = True
sys.path.pop(0)
break
if TYPE_CHECKING: if TYPE_CHECKING:
from confighelper import ConfigHelper from confighelper import ConfigHelper
from websockets import WebRequest from websockets import WebRequest
from .machine import Machine from .machine import Machine
from .gpio import GpioFactory
from . import klippy_apis from . import klippy_apis
APIComp = klippy_apis.KlippyAPI APIComp = klippy_apis.KlippyAPI
class PrinterPower: class PrinterPower:
def __init__(self, config: ConfigHelper) -> None: def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server() self.server = config.get_server()
if not HAS_GPIOD: has_gpio = self.server.load_component(config, 'gpio', None) is not None
self.server.add_warning(
"Unable to load gpiod library, GPIO power "
"devices will not be loaded")
self.chip_factory = GpioChipFactory()
self.devices: Dict[str, PowerDevice] = {} self.devices: Dict[str, PowerDevice] = {}
prefix_sections = config.get_prefix_sections("power") prefix_sections = config.get_prefix_sections("power")
logging.info(f"Power component loading devices: {prefix_sections}") logging.info(f"Power component loading devices: {prefix_sections}")
@ -76,7 +52,7 @@ class PrinterPower:
"loxonev1": Loxonev1, "loxonev1": Loxonev1,
"rf": RFDevice "rf": RFDevice
} }
try:
for section in prefix_sections: for section in prefix_sections:
cfg = config[section] cfg = config[section]
dev_type: str = cfg.get("type") dev_type: str = cfg.get("type")
@ -84,18 +60,13 @@ class PrinterPower:
dev_class = dev_types.get(dev_type) dev_class = dev_types.get(dev_type)
if dev_class is None: if dev_class is None:
raise config.error(f"Unsupported Device Type: {dev_type}") raise config.error(f"Unsupported Device Type: {dev_type}")
dev = dev_class(cfg) if issubclass(dev_class, GpioDevice) and not has_gpio:
if isinstance(dev, GpioDevice) or isinstance(dev, RFDevice):
if not HAS_GPIOD:
self.server.add_warning( self.server.add_warning(
f"Unable to load power device [{cfg.get_name()}], " f"Unable to load power device [{cfg.get_name()}], "
"gpiod module not loaded") "gpio component not available")
continue continue
dev.configure_line(cfg, self.chip_factory) dev = dev_class(cfg)
self.devices[dev.get_name()] = dev self.devices[dev.get_name()] = dev
except Exception:
self.chip_factory.close()
raise
self.server.register_endpoint( self.server.register_endpoint(
"/machine/device_power/devices", ['GET'], "/machine/device_power/devices", ['GET'],
@ -278,7 +249,6 @@ class PrinterPower:
ret = device.close() ret = device.close()
if ret is not None: if ret is not None:
await ret await ret
self.chip_factory.close()
class PowerDevice: class PowerDevice:
@ -459,28 +429,11 @@ class HTTPDevice(PowerDevice):
self.state = state self.state = state
class GpioChipFactory:
def __init__(self) -> None:
self.chips: Dict[str, gpiod.Chip] = {}
version: str = gpiod.version_string()
self.gpiod_version = tuple(int(v) for v in version.split('.'))
def get_gpio_chip(self, chip_name) -> gpiod.Chip:
if chip_name in self.chips:
return self.chips[chip_name]
chip = gpiod.Chip(chip_name, gpiod.Chip.OPEN_BY_NAME)
self.chips[chip_name] = chip
return chip
def get_gpiod_version(self):
return self.gpiod_version
def close(self) -> None:
for chip in self.chips.values():
chip.close()
class GpioDevice(PowerDevice): class GpioDevice(PowerDevice):
def __init__(self, config: ConfigHelper): def __init__(self,
config: ConfigHelper,
initial_val: Optional[int] = None
) -> None:
super().__init__(config) super().__init__(config)
self.initial_state = config.getboolean('initial_state', False) self.initial_state = config.getboolean('initial_state', False)
self.timer: Optional[float] = config.getfloat('timer', None) self.timer: Optional[float] = config.getfloat('timer', None)
@ -489,54 +442,11 @@ class GpioDevice(PowerDevice):
f"Option 'timer' in section [{config.get_name()}] must " f"Option 'timer' in section [{config.get_name()}] must "
"be above 0.0") "be above 0.0")
self.timer_handle: Optional[asyncio.TimerHandle] = None self.timer_handle: Optional[asyncio.TimerHandle] = None
gpio: GpioFactory = self.server.lookup_component('gpio')
def configure_line(self, if initial_val is None:
config: ConfigHelper, initial_val = int(self.initial_state)
chip_factory: GpioChipFactory self.gpio_out = gpio.get_gpio_out_from_config(
) -> None: config, initial_value=initial_val)
pin, chip_id, invert = self._parse_pin(config)
try:
chip = chip_factory.get_gpio_chip(chip_id)
self.line = chip.get_line(pin)
args: Dict[str, Any] = {
'consumer': "moonraker",
'type': gpiod.LINE_REQ_DIR_OUT
}
if invert:
args['flags'] = gpiod.LINE_REQ_FLAG_ACTIVE_LOW
if chip_factory.get_gpiod_version() < (1, 3):
args['default_vals'] = [int(self.initial_state)]
else:
args['default_val'] = int(self.initial_state)
self.line.request(**args)
except Exception:
self.state = "error"
logging.exception(
f"Unable to init {pin}. Make sure the gpio is not in "
"use by another program or exported by sysfs.")
raise config.error("Power GPIO Config Error")
def _parse_pin(self, config: ConfigHelper) -> Tuple[int, str, bool]:
pin = cfg_pin = config.get("pin")
invert = False
if pin[0] == "!":
pin = pin[1:]
invert = True
chip_id: str = "gpiochip0"
pin_parts = pin.split("/")
if len(pin_parts) == 2:
chip_id, pin = pin_parts
elif len(pin_parts) == 1:
pin = pin_parts[0]
# Verify pin
if not chip_id.startswith("gpiochip") or \
not chip_id[-1].isdigit() or \
not pin.startswith("gpio") or \
not pin[4:].isdigit():
raise config.error(
f"Invalid Power Pin configuration: {cfg_pin}")
pin_id = int(pin[4:])
return pin_id, chip_id, invert
def initialize(self) -> None: def initialize(self) -> None:
super().initialize() super().initialize()
@ -550,7 +460,7 @@ class GpioDevice(PowerDevice):
self.timer_handle.cancel() self.timer_handle.cancel()
self.timer_handle = None self.timer_handle = None
try: try:
self.line.set_value(int(state == "on")) self.gpio_out.write(int(state == "on"))
except Exception: except Exception:
self.state = "error" self.state = "error"
msg = f"Error Toggling Device Power: {self.name}" msg = f"Error Toggling Device Power: {self.name}"
@ -564,7 +474,6 @@ class GpioDevice(PowerDevice):
self.timer, power.set_device_power, self.name, "off") self.timer, power.set_device_power, self.name, "off")
def close(self) -> None: def close(self) -> None:
self.line.release()
if self.timer_handle is not None: if self.timer_handle is not None:
self.timer_handle.cancel() self.timer_handle.cancel()
self.timer_handle = None self.timer_handle = None
@ -580,14 +489,14 @@ class RFDevice(GpioDevice):
RETRIES = 10 # send the code this many times RETRIES = 10 # send the code this many times
def __init__(self, config: ConfigHelper): def __init__(self, config: ConfigHelper):
super().__init__(config) super().__init__(config, initial_val=0)
self.on = config.get("on_code").zfill(24) self.on = config.get("on_code").zfill(24)
self.off = config.get("off_code").zfill(24) self.off = config.get("off_code").zfill(24)
def _transmit_digit(self, waveform) -> None: def _transmit_digit(self, waveform) -> None:
self.line.set_value(1) self.gpio_out.write(1)
time.sleep(waveform[0]*RFDevice.PULSE_LEN) time.sleep(waveform[0]*RFDevice.PULSE_LEN)
self.line.set_value(0) self.gpio_out.write(0)
time.sleep(waveform[1]*RFDevice.PULSE_LEN) time.sleep(waveform[1]*RFDevice.PULSE_LEN)
def _transmit_code(self, code) -> None: def _transmit_code(self, code) -> None: