bulk_sensor: Move APIDumpHelper() from motion_report.py to bulk_sensor.py

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor 2023-12-16 14:07:06 -05:00
parent f4c8f0bf88
commit ffd44c0219
6 changed files with 104 additions and 103 deletions

View File

@ -4,7 +4,7 @@
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import logging, time, collections, multiprocessing, os
from . import bus, motion_report, bulk_sensor
from . import bus, bulk_sensor
# ADXL345 registers
REG_DEVID = 0x00
@ -217,7 +217,7 @@ class ADXL345:
BYTES_PER_SAMPLE)
self.last_error_count = 0
# API server endpoints
self.api_dump = motion_report.APIDumpHelper(
self.api_dump = bulk_sensor.APIDumpHelper(
self.printer, self._api_update, self._api_startstop, API_UPDATES)
self.name = config.get_name().split()[-1]
wh = self.printer.lookup_object('webhooks')

View File

@ -4,7 +4,7 @@
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import logging, math
from . import bus, motion_report, bulk_sensor
from . import bus, bulk_sensor
MIN_MSG_TIME = 0.100
TCODE_ERROR = 0xff
@ -441,7 +441,7 @@ class Angle:
mcu.register_config_callback(self._build_config)
self.bulk_queue = bulk_sensor.BulkDataQueue(mcu, "spi_angle_data", oid)
# API server endpoints
self.api_dump = motion_report.APIDumpHelper(
self.api_dump = bulk_sensor.APIDumpHelper(
self.printer, self._api_update, self._api_startstop, 0.100)
self.name = config.get_name().split()[1]
wh = self.printer.lookup_object('webhooks')

View File

@ -5,6 +5,99 @@
# This file may be distributed under the terms of the GNU GPLv3 license.
import threading
API_UPDATE_INTERVAL = 0.500
# Helper to periodically transmit data to a set of API clients
class APIDumpHelper:
def __init__(self, printer, data_cb, startstop_cb=None,
update_interval=API_UPDATE_INTERVAL):
self.printer = printer
self.data_cb = data_cb
if startstop_cb is None:
startstop_cb = (lambda is_start: None)
self.startstop_cb = startstop_cb
self.is_started = False
self.update_interval = update_interval
self.update_timer = None
self.clients = {}
def _stop(self):
self.clients.clear()
reactor = self.printer.get_reactor()
reactor.unregister_timer(self.update_timer)
self.update_timer = None
if not self.is_started:
return reactor.NEVER
try:
self.startstop_cb(False)
except self.printer.command_error as e:
logging.exception("API Dump Helper stop callback error")
self.clients.clear()
self.is_started = False
if self.clients:
# New client started while in process of stopping
self._start()
return reactor.NEVER
def _start(self):
if self.is_started:
return
self.is_started = True
try:
self.startstop_cb(True)
except self.printer.command_error as e:
logging.exception("API Dump Helper start callback error")
self.is_started = False
self.clients.clear()
raise
reactor = self.printer.get_reactor()
systime = reactor.monotonic()
waketime = systime + self.update_interval
self.update_timer = reactor.register_timer(self._update, waketime)
def add_client(self, web_request):
cconn = web_request.get_client_connection()
template = web_request.get_dict('response_template', {})
self.clients[cconn] = template
self._start()
def add_internal_client(self):
cconn = InternalDumpClient()
self.clients[cconn] = {}
self._start()
return cconn
def _update(self, eventtime):
try:
msg = self.data_cb(eventtime)
except self.printer.command_error as e:
logging.exception("API Dump Helper data callback error")
return self._stop()
if not msg:
return eventtime + self.update_interval
for cconn, template in list(self.clients.items()):
if cconn.is_closed():
del self.clients[cconn]
if not self.clients:
return self._stop()
continue
tmp = dict(template)
tmp['params'] = msg
cconn.send(tmp)
return eventtime + self.update_interval
# An "internal webhooks" wrapper for using APIDumpHelper internally
class InternalDumpClient:
def __init__(self):
self.msgs = []
self.is_done = False
def get_messages(self):
return self.msgs
def finalize(self):
self.is_done = True
def is_closed(self):
return self.is_done
def send(self, msg):
self.msgs.append(msg)
if len(self.msgs) >= 10000:
# Avoid filling up memory with too many samples
self.finalize()
# Helper class to store incoming messages in a queue
class BulkDataQueue:
def __init__(self, mcu, msg_name, oid):

View File

@ -5,7 +5,7 @@
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import logging
from . import bus, motion_report, adxl345, bulk_sensor
from . import bus, adxl345, bulk_sensor
# LIS2DW registers
REG_LIS2DW_WHO_AM_I_ADDR = 0x0F
@ -63,7 +63,7 @@ class LIS2DW:
BYTES_PER_SAMPLE)
self.last_error_count = 0
# API server endpoints
self.api_dump = motion_report.APIDumpHelper(
self.api_dump = bulk_sensor.APIDumpHelper(
self.printer, self._api_update, self._api_startstop, API_UPDATES)
self.name = config.get_name().split()[-1]
wh = self.printer.lookup_object('webhooks')

View File

@ -5,99 +5,7 @@
# This file may be distributed under the terms of the GNU GPLv3 license.
import logging
import chelper
API_UPDATE_INTERVAL = 0.500
# Helper to periodically transmit data to a set of API clients
class APIDumpHelper:
def __init__(self, printer, data_cb, startstop_cb=None,
update_interval=API_UPDATE_INTERVAL):
self.printer = printer
self.data_cb = data_cb
if startstop_cb is None:
startstop_cb = (lambda is_start: None)
self.startstop_cb = startstop_cb
self.is_started = False
self.update_interval = update_interval
self.update_timer = None
self.clients = {}
def _stop(self):
self.clients.clear()
reactor = self.printer.get_reactor()
reactor.unregister_timer(self.update_timer)
self.update_timer = None
if not self.is_started:
return reactor.NEVER
try:
self.startstop_cb(False)
except self.printer.command_error as e:
logging.exception("API Dump Helper stop callback error")
self.clients.clear()
self.is_started = False
if self.clients:
# New client started while in process of stopping
self._start()
return reactor.NEVER
def _start(self):
if self.is_started:
return
self.is_started = True
try:
self.startstop_cb(True)
except self.printer.command_error as e:
logging.exception("API Dump Helper start callback error")
self.is_started = False
self.clients.clear()
raise
reactor = self.printer.get_reactor()
systime = reactor.monotonic()
waketime = systime + self.update_interval
self.update_timer = reactor.register_timer(self._update, waketime)
def add_client(self, web_request):
cconn = web_request.get_client_connection()
template = web_request.get_dict('response_template', {})
self.clients[cconn] = template
self._start()
def add_internal_client(self):
cconn = InternalDumpClient()
self.clients[cconn] = {}
self._start()
return cconn
def _update(self, eventtime):
try:
msg = self.data_cb(eventtime)
except self.printer.command_error as e:
logging.exception("API Dump Helper data callback error")
return self._stop()
if not msg:
return eventtime + self.update_interval
for cconn, template in list(self.clients.items()):
if cconn.is_closed():
del self.clients[cconn]
if not self.clients:
return self._stop()
continue
tmp = dict(template)
tmp['params'] = msg
cconn.send(tmp)
return eventtime + self.update_interval
# An "internal webhooks" wrapper for using APIDumpHelper internally
class InternalDumpClient:
def __init__(self):
self.msgs = []
self.is_done = False
def get_messages(self):
return self.msgs
def finalize(self):
self.is_done = True
def is_closed(self):
return self.is_done
def send(self, msg):
self.msgs.append(msg)
if len(self.msgs) >= 10000:
# Avoid filling up memory with too many samples
self.finalize()
from . import bulk_sensor
# Extract stepper queue_step messages
class DumpStepper:
@ -105,7 +13,7 @@ class DumpStepper:
self.printer = printer
self.mcu_stepper = mcu_stepper
self.last_api_clock = 0
self.api_dump = APIDumpHelper(printer, self._api_update)
self.api_dump = bulk_sensor.APIDumpHelper(printer, self._api_update)
wh = self.printer.lookup_object('webhooks')
wh.register_mux_endpoint("motion_report/dump_stepper", "name",
mcu_stepper.get_name(), self._add_api_client)
@ -168,7 +76,7 @@ class DumpTrapQ:
self.name = name
self.trapq = trapq
self.last_api_msg = (0., 0.)
self.api_dump = APIDumpHelper(printer, self._api_update)
self.api_dump = bulk_sensor.APIDumpHelper(printer, self._api_update)
wh = self.printer.lookup_object('webhooks')
wh.register_mux_endpoint("motion_report/dump_trapq", "name", name,
self._add_api_client)

View File

@ -5,7 +5,7 @@
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import logging, time
from . import bus, motion_report, adxl345, bulk_sensor
from . import bus, adxl345, bulk_sensor
MPU9250_ADDR = 0x68
@ -80,7 +80,7 @@ class MPU9250:
BYTES_PER_SAMPLE)
self.last_error_count = 0
# API server endpoints
self.api_dump = motion_report.APIDumpHelper(
self.api_dump = bulk_sensor.APIDumpHelper(
self.printer, self._api_update, self._api_startstop, API_UPDATES)
self.name = config.get_name().split()[-1]
wh = self.printer.lookup_object('webhooks')