power: Use libgpiod instead of sysfs for gpio management

This resolves issues inherent with sysfs gpio management.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2020-11-15 11:08:25 -05:00
parent 48266e0bd6
commit 6738aa8c68
1 changed files with 93 additions and 112 deletions

View File

@ -7,6 +7,7 @@
import logging import logging
import os import os
import asyncio import asyncio
import gpiod
from tornado.ioloop import IOLoop from tornado.ioloop import IOLoop
from tornado import gen from tornado import gen
@ -28,16 +29,14 @@ class PrinterPower:
self.server.register_remote_method( self.server.register_remote_method(
"set_device_power", self.set_device_power) "set_device_power", self.set_device_power)
self.chip_factory = GpioChipFactory()
self.current_dev = None self.current_dev = None
self.devices = {} self.devices = {}
prefix_sections = config.get_prefix_sections("power") prefix_sections = config.get_prefix_sections("power")
logging.info(f"Power plugin loading devices: f{prefix_sections}") logging.info(f"Power plugin loading devices: {prefix_sections}")
devices = {}
for section in prefix_sections: for section in prefix_sections:
dev = GpioDevice(config[section]) dev = GpioDevice(config[section], self.chip_factory)
devices[dev.name] = dev self.devices[dev.get_name()] = dev
ioloop = IOLoop.current()
ioloop.spawn_callback(self.initialize_devices, devices)
async def _handle_list_devices(self, web_request): async def _handle_list_devices(self, web_request):
output = {"devices": list(self.devices.keys())} output = {"devices": list(self.devices.keys())}
@ -58,7 +57,7 @@ class PrinterPower:
if req not in ("on", "off", "status"): if req not in ("on", "off", "status"):
raise self.server.error("Unsupported power request") raise self.server.error("Unsupported power request")
if (await self._power_dev(dev, req)): if (await self._power_dev(dev, req)):
result[dev] = self.devices[dev].status result[dev] = self.devices[dev].get_state()
else: else:
result[dev] = "device_not_found" result[dev] = "device_not_found"
return result return result
@ -67,8 +66,9 @@ class PrinterPower:
if dev not in self.devices: if dev not in self.devices:
return False return False
if req in ["on", "off"]: if req in ["on", "off"]:
await self.devices[dev].power(req) ret = self.devices[dev].set_power(req)
if asyncio.iscoroutine(ret):
await ret
self.server.send_event("gpio_power:power_changed", { self.server.send_event("gpio_power:power_changed", {
"device": dev, "device": dev,
"status": req "status": req
@ -80,17 +80,6 @@ class PrinterPower:
await ret await ret
return True return True
async def initialize_devices(self, devices):
for name, device in devices.items():
try:
await device.initialize()
except Exception:
logging.exception(
f"Power plugin: ERR Problem configuring the output pin for"
f" device {name}. Removing device")
continue
self.devices[name] = device
def set_device_power(self, device, state): def set_device_power(self, device, state):
status = None status = None
if isinstance(state, bool): if isinstance(state, bool):
@ -109,122 +98,114 @@ class PrinterPower:
if name in self.devices: if name in self.devices:
raise self.server.error( raise self.server.error(
f"Device [{name}] already configured") f"Device [{name}] already configured")
await device.initialize() ret = device.initialize()
if asyncio.iscoroutine(ret):
await ret
self.devices[name] = device self.devices[name] = device
async def close(self):
for device in self.devices.values():
if hasattr(device, "close"):
ret = device.close()
if asyncio.iscoroutine(ret):
await ret
self.chip_factory.close()
class GPIO:
gpio_root = "/sys/class/gpio"
@staticmethod class GpioChipFactory:
def _set_gpio_option(pin, option, value): def __init__(self):
GPIO._write( self.chips = {}
os.path.join(GPIO.gpio_root, f"gpio{pin}", option),
value
)
@staticmethod def get_gpio_chip(self, chip_name):
def _get_gpio_option(pin, option): if chip_name in self.chips:
return GPIO._read( return self.chips[chip_name]
os.path.join(GPIO.gpio_root, f"gpio{pin}", option) chip = gpiod.Chip(chip_name, gpiod.Chip.OPEN_BY_NAME)
) self.chips[chip_name] = chip
return chip
@staticmethod
def _write(file, data):
with open(file, 'w') as f:
f.write(str(data))
f.flush()
@staticmethod
def _read(file):
with open(file, 'r') as f:
f.seek(0)
return f.read().strip()
@staticmethod
async def verify_pin(pin, active_low=1):
gpiopath = os.path.join(GPIO.gpio_root, f"gpio{pin}")
if not os.path.exists(gpiopath):
logging.info(f"Re-intializing GPIO{pin}")
await GPIO.setup_pin(pin, active_low)
return
if GPIO._get_gpio_option(pin, "active_low").strip() != str(active_low):
GPIO._set_gpio_option(pin, "active_low", active_low)
if GPIO._get_gpio_option(pin, "direction").strip() != "out":
GPIO._set_gpio_option(pin, "direction", "out")
@staticmethod
async def setup_pin(pin, active_low=1):
pin = int(pin)
active_low = 1 if active_low == 1 else 0
gpiopath = os.path.join(GPIO.gpio_root, f"gpio{pin}")
if not os.path.exists(gpiopath):
GPIO._write(
os.path.join(GPIO.gpio_root, "export"),
pin)
logging.info(f"Waiting for GPIO{pin} to initialize")
while os.stat(os.path.join(
GPIO.gpio_root, f"gpio{pin}",
"active_low")).st_gid == 0:
await gen.sleep(.1)
if GPIO._get_gpio_option(pin, "active_low").strip() != str(active_low):
GPIO._set_gpio_option(pin, "active_low", active_low)
if GPIO._get_gpio_option(pin, "direction").strip() != "out":
GPIO._set_gpio_option(pin, "direction", "out")
@staticmethod
def is_pin_on(pin):
return "on" if int(GPIO._get_gpio_option(pin, "value")) else "off"
@staticmethod
def set_pin_value(pin, active):
value = 1 if (active == 1) else 0
GPIO._set_gpio_option(pin, "value", value)
def close(self):
for chip in self.chips.values():
chip.close()
class GpioDevice: class GpioDevice:
def __init__(self, config): def __init__(self, config, chip_factory):
name_parts = config.get_name().split(maxsplit=1) name_parts = config.get_name().split(maxsplit=1)
if len(name_parts) != 2: if len(name_parts) != 2:
raise config.error(f"Invalid Section Name: {config.get_name()}") raise config.error(f"Invalid Section Name: {config.get_name()}")
self.name = name_parts[1] self.name = name_parts[1]
self.status = None self.state = "init"
cfg_pin = pin = config.get("pin") pin, chip_id, invert = self._parse_pin(config)
self.invert = False try:
chip = chip_factory.get_gpio_chip(chip_id)
self.line = chip.get_line(pin)
if invert:
self.line.request(
consumer="moonraker", type=gpiod.LINE_REQ_DIR_OUT,
flags=gpiod.LINE_REQ_FLAG_ACTIVE_LOW)
else:
self.line.request(
consumer="moonraker", type=gpiod.LINE_REQ_DIR_OUT)
except Exception:
self.state = "error"
logging.exception(
f"Unable to init {pin}. Make sure the gpio is not in "
"use by another program or exported by sysfs.")
raise config.error("Power GPIO Config Error")
self.set_power("off")
def _parse_pin(self, config):
pin = cfg_pin = config.get("pin")
invert = False
if pin[0] == "!": if pin[0] == "!":
pin = pin[1:] pin = pin[1:]
self.invert = True invert = True
self.consumer = "gpiochip0" chip_id = "gpiochip0"
pin_parts = pin.split("/") pin_parts = pin.split("/")
self.pin = ""
if len(pin_parts) == 2: if len(pin_parts) == 2:
self.consumer, self.pin = pin_parts chip_id, pin = pin_parts
elif len(pin_parts) == 1: elif len(pin_parts) == 1:
self.pin = pin_parts[0] pin = pin_parts[0]
# Verify pin # Verify pin
if not self.consumer.startswith("gpiochip") or \ if not chip_id.startswith("gpiochip") or \
not self.consumer[-1].isdigit() or \ not chip_id[-1].isdigit() or \
not self.pin.startswith("gpio") or \ not pin.startswith("gpio") or \
not self.pin[4:].isdigit(): not pin[4:].isdigit():
raise config.error( raise config.error(
f"Invalid Power Pin configuration: {cfg_pin}") f"Invalid Power Pin configuration: {cfg_pin}")
self.pin = int(self.pin[4:]) pin = int(pin[4:])
return pin, chip_id, invert
async def initialize(self): def initialize(self):
await GPIO.setup_pin(self.pin, int(self.invert)) pass
self.refresh_status()
def get_name(self):
return self.name
def get_state(self):
return self.state
def refresh_status(self): def refresh_status(self):
self.status = GPIO.is_pin_on(self.pin) try:
val = self.line.get_value()
except Exception:
self.state = "error"
msg = f"Error Refeshing Device Status: {self.name}"
logging.exception(msg)
raise self.server.error(msg) from None
self.state = "on" if val else "off"
async def power(self, status): def set_power(self, state):
await GPIO.verify_pin(self.pin, int(self.invert)) try:
GPIO.set_pin_value(self.pin, int(status == "on")) self.line.set_value(int(state == "on"))
except Exception:
self.state = "error"
msg = f"Error Toggling Device Power: {self.name}"
logging.exception(msg)
raise self.server.error(msg) from None
self.state = state
def close(self):
self.line.release()
# The power plugin has multiple configuration sections # The power plugin has multiple configuration sections
def load_plugin_multi(config): def load_plugin_multi(config):