button: initial implementation
Adds support for GPIO buttons. Each button is configured to render a template when pressed and/or released. The button event templates recieve a context with a "call_method" field, allowing them to call nearly all Moonraker APIs. Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
parent
a652845843
commit
2025332e6f
|
@ -0,0 +1,132 @@
|
|||
# Support for GPIO Button actions
|
||||
#
|
||||
# Copyright (C) 2021 Eric Callahan <arksine.code@gmail.com>
|
||||
#
|
||||
# This file may be distributed under the terms of the GNU GPLv3 license.
|
||||
from __future__ import annotations
|
||||
import asyncio
|
||||
import logging
|
||||
from confighelper import SentinelClass
|
||||
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Dict
|
||||
)
|
||||
if TYPE_CHECKING:
|
||||
from confighelper import ConfigHelper
|
||||
from .gpio import GpioFactory
|
||||
from app import InternalTransport as ITransport
|
||||
|
||||
SENTINEL = SentinelClass.get_instance()
|
||||
|
||||
class ButtonManager:
|
||||
def __init__(self, config: ConfigHelper) -> None:
|
||||
self.server = config.get_server()
|
||||
self.buttons: Dict[str, GpioButton] = {}
|
||||
prefix_sections = config.get_prefix_sections("button")
|
||||
logging.info(f"Loading Buttons: {prefix_sections}")
|
||||
for section in prefix_sections:
|
||||
cfg = config[section]
|
||||
# Reserve the "type" option for future use
|
||||
btn_type = cfg.get('type', "gpio")
|
||||
try:
|
||||
btn = GpioButton(cfg)
|
||||
except Exception as e:
|
||||
msg = f"Failed to load button [{cfg.get_name()}]\n{e}"
|
||||
self.server.add_warning(msg)
|
||||
continue
|
||||
self.buttons[btn.name] = btn
|
||||
self.server.register_notification("button:button_event")
|
||||
|
||||
def component_init(self) -> None:
|
||||
for btn in self.buttons.values():
|
||||
btn.initialize()
|
||||
|
||||
class GpioButton:
|
||||
def __init__(self, config: ConfigHelper) -> None:
|
||||
self.server = config.get_server()
|
||||
self.eventloop = self.server.get_event_loop()
|
||||
self.name = config.get_name().split()[-1]
|
||||
self.itransport: ITransport = self.server.lookup_component(
|
||||
'internal_transport')
|
||||
self.mutex = asyncio.Lock()
|
||||
gpio: GpioFactory = self.server.load_component(config, 'gpio')
|
||||
self.gpio_event = gpio.register_gpio_event(
|
||||
config.get('pin'), self._on_gpio_event)
|
||||
min_event_time = config.getfloat(
|
||||
'minimum_event_time', .05, minval=.010)
|
||||
self.gpio_event.setup_debounce(min_event_time, self._on_gpio_error)
|
||||
self.press_template = config.gettemplate(
|
||||
"on_press", None, is_async=True)
|
||||
self.release_template = config.gettemplate(
|
||||
"on_release", None, is_async=True)
|
||||
if (
|
||||
self.press_template is None and
|
||||
self.release_template is None
|
||||
):
|
||||
raise config.error(
|
||||
f"[{config.get_name()}]: No template option configured")
|
||||
self.notification_sent: bool = False
|
||||
self.user_data: Dict[str, Any] = {}
|
||||
self.context: Dict[str, Any] = {
|
||||
'call_method': self.itransport.call_method,
|
||||
'send_notification': self._send_notification,
|
||||
'event': {
|
||||
'elapsed_time': 0.,
|
||||
'received_time': 0.,
|
||||
'render_time': 0.,
|
||||
'pressed': False,
|
||||
},
|
||||
'user_data': self.user_data
|
||||
}
|
||||
|
||||
def initialize(self) -> None:
|
||||
self.gpio_event.start()
|
||||
self.context['event']['pressed'] = bool(self.gpio_event.get_value())
|
||||
|
||||
def get_status(self) -> Dict[str, Any]:
|
||||
return {
|
||||
'name': self.name,
|
||||
'type': "gpio",
|
||||
'event': self.context['event'],
|
||||
}
|
||||
|
||||
def _send_notification(self, result: Any = None) -> None:
|
||||
if self.notification_sent:
|
||||
# Only allow execution once per template
|
||||
return
|
||||
self.notification_sent = True
|
||||
data = self.get_status()
|
||||
data['aux'] = result
|
||||
self.server.send_event("button:button_event", data)
|
||||
|
||||
async def _on_gpio_event(self,
|
||||
eventtime: float,
|
||||
elapsed_time: float,
|
||||
pressed: int
|
||||
) -> None:
|
||||
template = self.press_template if pressed else self.release_template
|
||||
if template is None:
|
||||
return
|
||||
async with self.mutex:
|
||||
self.notification_sent = False
|
||||
event_info: Dict[str, Any] = {
|
||||
'elapsed_time': elapsed_time,
|
||||
'received_time': eventtime,
|
||||
'render_time': self.eventloop.get_loop_time(),
|
||||
'pressed': bool(pressed)
|
||||
}
|
||||
self.context['event'] = event_info
|
||||
try:
|
||||
await template.render_async(self.context)
|
||||
except Exception:
|
||||
action = "on_press" if pressed else "on_release"
|
||||
logging.exception(
|
||||
f"Button {self.name}: '{action}' template error")
|
||||
|
||||
def _on_gpio_error(self, message: str) -> None:
|
||||
self.server.add_warning(f"Button {self.name}: {message}")
|
||||
|
||||
def load_component(config: ConfigHelper) -> ButtonManager:
|
||||
return ButtonManager(config)
|
Loading…
Reference in New Issue