gpio: improve event debounce procedure

Use a traditional debouncing method, waiting for a specified
debounce period before triggering events.  Consumers may
choose to futher ignore events based on the the duration
between events.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-12-20 10:50:11 -05:00
parent 78d7a4f560
commit 2298f6b5a7
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
2 changed files with 47 additions and 30 deletions

View File

@ -46,25 +46,24 @@ class GpioButton:
self.server = config.get_server()
self.eventloop = self.server.get_event_loop()
self.name = config.get_name().split()[-1]
self.itransport: ITransport = self.server.lookup_component(
'internal_transport')
self.itransport: ITransport = self.server.lookup_component("internal_transport")
self.mutex = asyncio.Lock()
gpio: GpioFactory = self.server.load_component(config, 'gpio')
gpio: GpioFactory = self.server.load_component(config, "gpio")
self.gpio_event = gpio.register_gpio_event(
config.get('pin'), self._on_gpio_event)
min_event_time = config.getfloat(
'minimum_event_time', .05, minval=.010)
self.gpio_event.setup_debounce(min_event_time, self._on_gpio_error)
self.press_template = config.gettemplate(
"on_press", None, is_async=True)
self.release_template = config.gettemplate(
"on_release", None, is_async=True)
config.get('pin'), self._on_gpio_event
)
self.min_event_time = config.getfloat("minimum_event_time", 0, minval=0.0)
debounce_period = config.getfloat("debounce_period", .05, minval=0.01)
self.gpio_event.setup_debounce(debounce_period, self._on_gpio_error)
self.press_template = config.gettemplate("on_press", None, is_async=True)
self.release_template = config.gettemplate("on_release", None, is_async=True)
if (
self.press_template is None and
self.release_template is None
):
raise config.error(
f"[{config.get_name()}]: No template option configured")
f"[{config.get_name()}]: No template option configured"
)
self.notification_sent: bool = False
self.user_data: Dict[str, Any] = {}
self.context: Dict[str, Any] = {
@ -99,11 +98,11 @@ class GpioButton:
data['aux'] = result
self.server.send_event("button:button_event", data)
async def _on_gpio_event(self,
eventtime: float,
elapsed_time: float,
pressed: int
) -> None:
async def _on_gpio_event(
self, eventtime: float, elapsed_time: float, pressed: int
) -> None:
if elapsed_time < self.min_event_time:
return
template = self.press_template if pressed else self.release_template
if template is None:
return

View File

@ -5,6 +5,7 @@
# This file may be distributed under the terms of the GNU GPLv3 license.
from __future__ import annotations
import os
import asyncio
import platform
import pathlib
import logging
@ -187,21 +188,21 @@ class GpioEvent(GpioBase):
self.event_loop = event_loop
self.callback = callback
self.on_error: Optional[Callable[[str], None]] = None
self.min_evt_time = 0.
self.last_event_time = 0.
self.debounce_period: float = 0
self.last_event_time: float = 0.
self.error_count = 0
self.last_error_reset = 0.
self.started = False
self.debounce_task: Optional[asyncio.Task] = None
os.set_blocking(self.gpio.fd, False)
def fileno(self) -> int:
return self.gpio.fd
def setup_debounce(self,
min_evt_time: float,
err_callback: Optional[Callable[[str], None]]
) -> None:
self.min_evt_time = max(min_evt_time, 0.)
def setup_debounce(
self, debounce_period: float, err_callback: Optional[Callable[[str], None]]
) -> None:
self.debounce_period = max(debounce_period, 0)
self.on_error = err_callback
def start(self) -> None:
@ -214,6 +215,9 @@ class GpioEvent(GpioBase):
f"current state: {self.value}")
def stop(self) -> None:
if self.debounce_task is not None:
self.debounce_task.cancel()
self.debounce_task = None
if self.started:
self.event_loop.remove_reader(self.gpio.fd)
self.started = False
@ -224,25 +228,39 @@ class GpioEvent(GpioBase):
def _on_event_trigger(self) -> None:
evt = self.gpio.read_event()
last_val = self.value
last_value = self.value
if evt.edge == "rising": # type: ignore
self.value = 1
elif evt.edge == "falling": # type: ignore
self.value = 0
else:
return
if self.debounce_period:
if self.debounce_task is None:
coro = self._debounce(last_value)
self.debounce_task = self.event_loop.create_task(coro)
else:
self._increment_error()
elif last_value != self.value:
# No debounce period and change detected
self._run_callback()
async def _debounce(self, last_value: int) -> None:
await asyncio.sleep(self.debounce_period)
self.debounce_task = None
if last_value != self.value:
self._run_callback()
def _run_callback(self) -> None:
eventtime = self.event_loop.get_loop_time()
evt_duration = eventtime - self.last_event_time
if last_val == self.value or evt_duration < self.min_evt_time:
self._increment_error(eventtime)
return
self.last_event_time = eventtime
self.error_count = 0
ret = self.callback(eventtime, evt_duration, self.value)
if ret is not None:
self.event_loop.create_task(ret) # type: ignore
def _increment_error(self, eventtime: float) -> None:
def _increment_error(self) -> None:
eventtime = self.event_loop.get_loop_time()
if eventtime - self.last_error_reset > ERROR_RESET_TIME:
self.error_count = 0
self.last_error_reset = eventtime