From 2025332e6fb6c9dd94f19917c0078a4a588cae14 Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Sun, 26 Dec 2021 15:46:07 -0500 Subject: [PATCH] button: initial implementation Adds support for GPIO buttons. Each button is configured to render a template when pressed and/or released. The button event templates recieve a context with a "call_method" field, allowing them to call nearly all Moonraker APIs. Signed-off-by: Eric Callahan --- moonraker/components/button.py | 132 +++++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 moonraker/components/button.py diff --git a/moonraker/components/button.py b/moonraker/components/button.py new file mode 100644 index 0000000..7ce5f84 --- /dev/null +++ b/moonraker/components/button.py @@ -0,0 +1,132 @@ +# Support for GPIO Button actions +# +# Copyright (C) 2021 Eric Callahan +# +# This file may be distributed under the terms of the GNU GPLv3 license. +from __future__ import annotations +import asyncio +import logging +from confighelper import SentinelClass + +from typing import ( + TYPE_CHECKING, + Any, + Dict +) +if TYPE_CHECKING: + from confighelper import ConfigHelper + from .gpio import GpioFactory + from app import InternalTransport as ITransport + +SENTINEL = SentinelClass.get_instance() + +class ButtonManager: + def __init__(self, config: ConfigHelper) -> None: + self.server = config.get_server() + self.buttons: Dict[str, GpioButton] = {} + prefix_sections = config.get_prefix_sections("button") + logging.info(f"Loading Buttons: {prefix_sections}") + for section in prefix_sections: + cfg = config[section] + # Reserve the "type" option for future use + btn_type = cfg.get('type', "gpio") + try: + btn = GpioButton(cfg) + except Exception as e: + msg = f"Failed to load button [{cfg.get_name()}]\n{e}" + self.server.add_warning(msg) + continue + self.buttons[btn.name] = btn + self.server.register_notification("button:button_event") + + def component_init(self) -> None: + for btn in self.buttons.values(): + btn.initialize() + +class GpioButton: + def __init__(self, config: ConfigHelper) -> None: + self.server = config.get_server() + self.eventloop = self.server.get_event_loop() + self.name = config.get_name().split()[-1] + self.itransport: ITransport = self.server.lookup_component( + 'internal_transport') + self.mutex = asyncio.Lock() + gpio: GpioFactory = self.server.load_component(config, 'gpio') + self.gpio_event = gpio.register_gpio_event( + config.get('pin'), self._on_gpio_event) + min_event_time = config.getfloat( + 'minimum_event_time', .05, minval=.010) + self.gpio_event.setup_debounce(min_event_time, self._on_gpio_error) + self.press_template = config.gettemplate( + "on_press", None, is_async=True) + self.release_template = config.gettemplate( + "on_release", None, is_async=True) + if ( + self.press_template is None and + self.release_template is None + ): + raise config.error( + f"[{config.get_name()}]: No template option configured") + self.notification_sent: bool = False + self.user_data: Dict[str, Any] = {} + self.context: Dict[str, Any] = { + 'call_method': self.itransport.call_method, + 'send_notification': self._send_notification, + 'event': { + 'elapsed_time': 0., + 'received_time': 0., + 'render_time': 0., + 'pressed': False, + }, + 'user_data': self.user_data + } + + def initialize(self) -> None: + self.gpio_event.start() + self.context['event']['pressed'] = bool(self.gpio_event.get_value()) + + def get_status(self) -> Dict[str, Any]: + return { + 'name': self.name, + 'type': "gpio", + 'event': self.context['event'], + } + + def _send_notification(self, result: Any = None) -> None: + if self.notification_sent: + # Only allow execution once per template + return + self.notification_sent = True + data = self.get_status() + data['aux'] = result + self.server.send_event("button:button_event", data) + + async def _on_gpio_event(self, + eventtime: float, + elapsed_time: float, + pressed: int + ) -> None: + template = self.press_template if pressed else self.release_template + if template is None: + return + async with self.mutex: + self.notification_sent = False + event_info: Dict[str, Any] = { + 'elapsed_time': elapsed_time, + 'received_time': eventtime, + 'render_time': self.eventloop.get_loop_time(), + 'pressed': bool(pressed) + } + self.context['event'] = event_info + try: + await template.render_async(self.context) + except Exception: + action = "on_press" if pressed else "on_release" + logging.exception( + f"Button {self.name}: '{action}' template error") + + def _on_gpio_error(self, message: str) -> None: + self.server.add_warning(f"Button {self.name}: {message}") + +def load_component(config: ConfigHelper) -> ButtonManager: + return ButtonManager(config)