gpio: add support for line events
Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
parent
9346186337
commit
fc5e34096a
|
@ -11,20 +11,28 @@ from utils import load_system_module
|
||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
Any,
|
Any,
|
||||||
Dict
|
Awaitable,
|
||||||
|
Callable,
|
||||||
|
Dict,
|
||||||
|
Optional
|
||||||
)
|
)
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from confighelper import ConfigHelper
|
from confighelper import ConfigHelper
|
||||||
|
from eventloop import EventLoop
|
||||||
|
GPIO_CALLBACK = Callable[[float, float, int], Optional[Awaitable[None]]]
|
||||||
|
|
||||||
class GpioFactory:
|
class GpioFactory:
|
||||||
def __init__(self, config: ConfigHelper) -> None:
|
def __init__(self, config: ConfigHelper) -> None:
|
||||||
self.server = config.get_server()
|
self.server = config.get_server()
|
||||||
self.gpiod: Any = load_system_module("gpiod")
|
self.gpiod: Any = load_system_module("gpiod")
|
||||||
|
GpioEvent.init_constants(self.gpiod)
|
||||||
self.chips: Dict[str, Any] = {}
|
self.chips: Dict[str, Any] = {}
|
||||||
self.reserved_gpios: Dict[str, GpioOutputPin] = {}
|
self.reserved_gpios: Dict[str, GpioBase] = {}
|
||||||
version: str = self.gpiod.version_string()
|
version: str = self.gpiod.version_string()
|
||||||
self.gpiod_version = tuple(int(v) for v in version.split('.'))
|
self.gpiod_version = tuple(int(v) for v in version.split('.'))
|
||||||
|
self.server.add_log_rollover_item(
|
||||||
|
"gpiod_version", f"libgpiod version: {version}")
|
||||||
|
|
||||||
def _get_gpio_chip(self, chip_name) -> Any:
|
def _get_gpio_chip(self, chip_name) -> Any:
|
||||||
if chip_name in self.chips:
|
if chip_name in self.chips:
|
||||||
|
@ -39,41 +47,100 @@ class GpioFactory:
|
||||||
) -> GpioOutputPin:
|
) -> GpioOutputPin:
|
||||||
initial_value = int(not not initial_value)
|
initial_value = int(not not initial_value)
|
||||||
pparams = self._parse_pin(pin_name)
|
pparams = self._parse_pin(pin_name)
|
||||||
|
pparams['initial_value'] = initial_value
|
||||||
|
line = self._request_gpio(pparams)
|
||||||
|
try:
|
||||||
|
gpio_out = GpioOutputPin(line, pparams)
|
||||||
|
except Exception:
|
||||||
|
logging.exception("Error Instantiating GpioOutputPin")
|
||||||
|
line.release()
|
||||||
|
raise
|
||||||
full_name = pparams['full_name']
|
full_name = pparams['full_name']
|
||||||
|
self.reserved_gpios[full_name] = gpio_out
|
||||||
|
return gpio_out
|
||||||
|
|
||||||
|
def register_gpio_event(self,
|
||||||
|
pin_name: str,
|
||||||
|
callback: GPIO_CALLBACK
|
||||||
|
) -> GpioEvent:
|
||||||
|
pin_params = self._parse_pin(pin_name, type="event")
|
||||||
|
line = self._request_gpio(pin_params)
|
||||||
|
event_loop = self.server.get_event_loop()
|
||||||
|
try:
|
||||||
|
gpio_event = GpioEvent(event_loop, line, pin_params, callback)
|
||||||
|
except Exception:
|
||||||
|
logging.exception("Error Instantiating GpioEvent")
|
||||||
|
line.release()
|
||||||
|
raise
|
||||||
|
full_name = pin_params['full_name']
|
||||||
|
self.reserved_gpios[full_name] = gpio_event
|
||||||
|
return gpio_event
|
||||||
|
|
||||||
|
def _request_gpio(self, pin_params: Dict[str, Any]) -> Any:
|
||||||
|
full_name = pin_params['full_name']
|
||||||
if full_name in self.reserved_gpios:
|
if full_name in self.reserved_gpios:
|
||||||
raise self.server.error(f"GPIO {full_name} already reserved")
|
raise self.server.error(f"GPIO {full_name} already reserved")
|
||||||
try:
|
try:
|
||||||
chip = self._get_gpio_chip(pparams['chip_id'])
|
chip = self._get_gpio_chip(pin_params['chip_id'])
|
||||||
line = chip.get_line(pparams['pin_id'])
|
line = chip.get_line(pin_params['pin_id'])
|
||||||
args: Dict[str, Any] = {
|
args: Dict[str, Any] = {
|
||||||
'consumer': "moonraker",
|
'consumer': "moonraker",
|
||||||
'type': self.gpiod.LINE_REQ_DIR_OUT
|
'type': pin_params['request_type']
|
||||||
}
|
}
|
||||||
if pparams['invert']:
|
if 'flags' in pin_params:
|
||||||
args['flags'] = self.gpiod.LINE_REQ_FLAG_ACTIVE_LOW
|
args['flags'] = pin_params['flags']
|
||||||
if self.gpiod_version < (1, 3):
|
if 'initial_value' in pin_params:
|
||||||
args['default_vals'] = [initial_value]
|
if self.gpiod_version < (1, 3):
|
||||||
else:
|
args['default_vals'] = [pin_params['initial_value']]
|
||||||
args['default_val'] = initial_value
|
else:
|
||||||
|
args['default_val'] = pin_params['initial_value']
|
||||||
line.request(**args)
|
line.request(**args)
|
||||||
except Exception:
|
except Exception:
|
||||||
logging.exception(
|
logging.exception(
|
||||||
f"Unable to init {full_name}. Make sure the gpio is not in "
|
f"Unable to init {full_name}. Make sure the gpio is not in "
|
||||||
"use by another program or exported by sysfs.")
|
"use by another program or exported by sysfs.")
|
||||||
raise
|
raise
|
||||||
gpio_out = GpioOutputPin(line, pparams, initial_value)
|
return line
|
||||||
self.reserved_gpios[full_name] = gpio_out
|
|
||||||
return gpio_out
|
|
||||||
|
|
||||||
def _parse_pin(self, pin_name: str) -> Dict[str, Any]:
|
def _parse_pin(self,
|
||||||
|
pin_name: str,
|
||||||
|
type: str = "out"
|
||||||
|
) -> Dict[str, Any]:
|
||||||
params: Dict[str, Any] = {
|
params: Dict[str, Any] = {
|
||||||
'orig': pin_name,
|
'orig': pin_name,
|
||||||
'invert': False,
|
'invert': False,
|
||||||
}
|
}
|
||||||
pin = pin_name
|
pin = pin_name
|
||||||
|
if type == "event":
|
||||||
|
params['request_type'] = self.gpiod.LINE_REQ_EV_BOTH_EDGES
|
||||||
|
flag: str = "disable"
|
||||||
|
if pin[0] == "^":
|
||||||
|
pin = pin[1:]
|
||||||
|
flag = "pullup"
|
||||||
|
elif pin[0] == "~":
|
||||||
|
pin = pin[1:]
|
||||||
|
flag = "pulldown"
|
||||||
|
if self.gpiod_version >= (1, 5):
|
||||||
|
flag_to_enum = {
|
||||||
|
"disable": self.gpiod.LINE_REQ_FLAG_BIAS_DISABLE,
|
||||||
|
"pullup": self.gpiod.LINE_REQ_FLAG_BIAS_PULL_UP,
|
||||||
|
"pulldown": self.gpiod.LINE_REQ_FLAG_BIAS_PULL_DOWN
|
||||||
|
}
|
||||||
|
params['flags'] = flag_to_enum[flag]
|
||||||
|
elif flag != "disable":
|
||||||
|
raise self.server.error(
|
||||||
|
f"Flag {flag} configured for event GPIO '{pin_name}'"
|
||||||
|
" requires libgpiod version 1.5 or later. "
|
||||||
|
f"Current Version: {self.gpiod.version_string()}")
|
||||||
|
elif type == "out":
|
||||||
|
params['request_type'] = self.gpiod.LINE_REQ_DIR_OUT
|
||||||
if pin[0] == "!":
|
if pin[0] == "!":
|
||||||
pin = pin[1:]
|
pin = pin[1:]
|
||||||
params['invert'] = True
|
params['invert'] = True
|
||||||
|
if 'flags' in params:
|
||||||
|
params['flags'] |= self.gpiod.LINE_REQ_FLAG_ACTIVE_LOW
|
||||||
|
else:
|
||||||
|
params['flags'] = self.gpiod.LINE_REQ_FLAG_ACTIVE_LOW
|
||||||
chip_id: str = "gpiochip0"
|
chip_id: str = "gpiochip0"
|
||||||
pin_parts = pin.split("/")
|
pin_parts = pin.split("/")
|
||||||
if len(pin_parts) == 2:
|
if len(pin_parts) == 2:
|
||||||
|
@ -99,22 +166,19 @@ class GpioFactory:
|
||||||
for chip in self.chips.values():
|
for chip in self.chips.values():
|
||||||
chip.close()
|
chip.close()
|
||||||
|
|
||||||
class GpioOutputPin:
|
class GpioBase:
|
||||||
def __init__(self,
|
def __init__(self,
|
||||||
line: Any,
|
line: Any,
|
||||||
pin_params: Dict[str, Any],
|
pin_params: Dict[str, Any]
|
||||||
initial_val: int
|
|
||||||
) -> None:
|
) -> None:
|
||||||
self.orig = pin_params['orig']
|
self.orig: str = pin_params['orig']
|
||||||
self.name = pin_params['full_name']
|
self.name: str = pin_params['full_name']
|
||||||
self.line = line
|
self.inverted: bool = pin_params['invert']
|
||||||
self.inverted = pin_params['invert']
|
self.line: Any = line
|
||||||
self.value = initial_val
|
self.value: int = pin_params.get('initial_value', 0)
|
||||||
self.release = line.release
|
|
||||||
|
|
||||||
def write(self, value: int) -> None:
|
def release(self) -> None:
|
||||||
self.value = int(not not value)
|
self.line.release()
|
||||||
self.line.set_value(self.value)
|
|
||||||
|
|
||||||
def is_inverted(self) -> bool:
|
def is_inverted(self) -> bool:
|
||||||
return self.inverted
|
return self.inverted
|
||||||
|
@ -128,5 +192,89 @@ class GpioOutputPin:
|
||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
return self.orig
|
return self.orig
|
||||||
|
|
||||||
|
class GpioOutputPin(GpioBase):
|
||||||
|
def write(self, value: int) -> None:
|
||||||
|
self.value = int(not not value)
|
||||||
|
self.line.set_value(self.value)
|
||||||
|
|
||||||
|
|
||||||
|
MAX_ERRORS = 20
|
||||||
|
|
||||||
|
class GpioEvent(GpioBase):
|
||||||
|
EVENT_FALLING_EDGE = 0
|
||||||
|
EVENT_RISING_EDGE = 1
|
||||||
|
def __init__(self,
|
||||||
|
event_loop: EventLoop,
|
||||||
|
line: Any,
|
||||||
|
pin_params: Dict[str, Any],
|
||||||
|
callback: GPIO_CALLBACK
|
||||||
|
) -> None:
|
||||||
|
super().__init__(line, pin_params)
|
||||||
|
self.event_loop = event_loop
|
||||||
|
self.fd = line.event_get_fd()
|
||||||
|
self.callback = callback
|
||||||
|
self.on_error: Optional[Callable[[str], None]] = None
|
||||||
|
self.min_evt_time = 0.
|
||||||
|
self.last_event_time = 0.
|
||||||
|
self.error_count = 0
|
||||||
|
self.started = False
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def init_constants(cls, gpiod: Any) -> None:
|
||||||
|
cls.EVENT_RISING_EDGE = gpiod.LineEvent.RISING_EDGE
|
||||||
|
cls.EVENT_FALLING_EDGE = gpiod.LineEvent.FALLING_EDGE
|
||||||
|
|
||||||
|
def setup_debounce(self,
|
||||||
|
min_evt_time: float,
|
||||||
|
err_callback: Optional[Callable[[str], None]]
|
||||||
|
) -> None:
|
||||||
|
self.min_evt_time = max(min_evt_time, 0.)
|
||||||
|
self.on_error = err_callback
|
||||||
|
|
||||||
|
def start(self) -> None:
|
||||||
|
if not self.started:
|
||||||
|
self.value = self.line.get_value()
|
||||||
|
self.last_event_time = self.event_loop.get_loop_time()
|
||||||
|
self.event_loop.add_reader(self.fd, self._on_event_trigger)
|
||||||
|
self.started = True
|
||||||
|
logging.debug(f"GPIO {self.name}: Listening for events, "
|
||||||
|
f"current state: {self.value}")
|
||||||
|
|
||||||
|
def stop(self) -> None:
|
||||||
|
if self.started:
|
||||||
|
self.event_loop.remove_reader(self.fd)
|
||||||
|
self.started = False
|
||||||
|
|
||||||
|
def release(self) -> None:
|
||||||
|
self.stop()
|
||||||
|
self.line.release()
|
||||||
|
|
||||||
|
def _on_event_trigger(self) -> None:
|
||||||
|
evt = self.line.event_read()
|
||||||
|
last_val = self.value
|
||||||
|
if evt.type == self.EVENT_RISING_EDGE:
|
||||||
|
self.value = 1
|
||||||
|
elif evt.type == self.EVENT_FALLING_EDGE:
|
||||||
|
self.value = 0
|
||||||
|
eventtime = self.event_loop.get_loop_time()
|
||||||
|
evt_duration = eventtime - self.last_event_time
|
||||||
|
if last_val == self.value or evt_duration < self.min_evt_time:
|
||||||
|
self._increment_error()
|
||||||
|
return
|
||||||
|
self.last_event_time = eventtime
|
||||||
|
self.error_count = 0
|
||||||
|
ret = self.callback(eventtime, evt_duration, self.value)
|
||||||
|
if ret is not None:
|
||||||
|
self.event_loop.create_task(ret)
|
||||||
|
|
||||||
|
def _increment_error(self) -> None:
|
||||||
|
self.error_count += 1
|
||||||
|
if self.error_count >= MAX_ERRORS:
|
||||||
|
self.stop()
|
||||||
|
if self.on_error is not None:
|
||||||
|
self.on_error("Too Many Consecutive Errors, "
|
||||||
|
f"GPIO Event Disabled on {self.name}")
|
||||||
|
|
||||||
|
|
||||||
def load_component(config: ConfigHelper) -> GpioFactory:
|
def load_component(config: ConfigHelper) -> GpioFactory:
|
||||||
return GpioFactory(config)
|
return GpioFactory(config)
|
||||||
|
|
Loading…
Reference in New Issue