diff --git a/klippy/extras/adxl345.py b/klippy/extras/adxl345.py index be4a0eac..6871ef53 100644 --- a/klippy/extras/adxl345.py +++ b/klippy/extras/adxl345.py @@ -4,7 +4,7 @@ # # This file may be distributed under the terms of the GNU GPLv3 license. import logging, time, collections, multiprocessing, os -from . import bus, motion_report, bulk_sensor +from . import bus, bulk_sensor # ADXL345 registers REG_DEVID = 0x00 @@ -217,7 +217,7 @@ class ADXL345: BYTES_PER_SAMPLE) self.last_error_count = 0 # API server endpoints - self.api_dump = motion_report.APIDumpHelper( + self.api_dump = bulk_sensor.APIDumpHelper( self.printer, self._api_update, self._api_startstop, API_UPDATES) self.name = config.get_name().split()[-1] wh = self.printer.lookup_object('webhooks') diff --git a/klippy/extras/angle.py b/klippy/extras/angle.py index 49020973..066167d0 100644 --- a/klippy/extras/angle.py +++ b/klippy/extras/angle.py @@ -4,7 +4,7 @@ # # This file may be distributed under the terms of the GNU GPLv3 license. import logging, math -from . import bus, motion_report, bulk_sensor +from . import bus, bulk_sensor MIN_MSG_TIME = 0.100 TCODE_ERROR = 0xff @@ -441,7 +441,7 @@ class Angle: mcu.register_config_callback(self._build_config) self.bulk_queue = bulk_sensor.BulkDataQueue(mcu, "spi_angle_data", oid) # API server endpoints - self.api_dump = motion_report.APIDumpHelper( + self.api_dump = bulk_sensor.APIDumpHelper( self.printer, self._api_update, self._api_startstop, 0.100) self.name = config.get_name().split()[1] wh = self.printer.lookup_object('webhooks') diff --git a/klippy/extras/bulk_sensor.py b/klippy/extras/bulk_sensor.py index 28aed48a..8f166ec8 100644 --- a/klippy/extras/bulk_sensor.py +++ b/klippy/extras/bulk_sensor.py @@ -5,6 +5,99 @@ # This file may be distributed under the terms of the GNU GPLv3 license. import threading +API_UPDATE_INTERVAL = 0.500 + +# Helper to periodically transmit data to a set of API clients +class APIDumpHelper: + def __init__(self, printer, data_cb, startstop_cb=None, + update_interval=API_UPDATE_INTERVAL): + self.printer = printer + self.data_cb = data_cb + if startstop_cb is None: + startstop_cb = (lambda is_start: None) + self.startstop_cb = startstop_cb + self.is_started = False + self.update_interval = update_interval + self.update_timer = None + self.clients = {} + def _stop(self): + self.clients.clear() + reactor = self.printer.get_reactor() + reactor.unregister_timer(self.update_timer) + self.update_timer = None + if not self.is_started: + return reactor.NEVER + try: + self.startstop_cb(False) + except self.printer.command_error as e: + logging.exception("API Dump Helper stop callback error") + self.clients.clear() + self.is_started = False + if self.clients: + # New client started while in process of stopping + self._start() + return reactor.NEVER + def _start(self): + if self.is_started: + return + self.is_started = True + try: + self.startstop_cb(True) + except self.printer.command_error as e: + logging.exception("API Dump Helper start callback error") + self.is_started = False + self.clients.clear() + raise + reactor = self.printer.get_reactor() + systime = reactor.monotonic() + waketime = systime + self.update_interval + self.update_timer = reactor.register_timer(self._update, waketime) + def add_client(self, web_request): + cconn = web_request.get_client_connection() + template = web_request.get_dict('response_template', {}) + self.clients[cconn] = template + self._start() + def add_internal_client(self): + cconn = InternalDumpClient() + self.clients[cconn] = {} + self._start() + return cconn + def _update(self, eventtime): + try: + msg = self.data_cb(eventtime) + except self.printer.command_error as e: + logging.exception("API Dump Helper data callback error") + return self._stop() + if not msg: + return eventtime + self.update_interval + for cconn, template in list(self.clients.items()): + if cconn.is_closed(): + del self.clients[cconn] + if not self.clients: + return self._stop() + continue + tmp = dict(template) + tmp['params'] = msg + cconn.send(tmp) + return eventtime + self.update_interval + +# An "internal webhooks" wrapper for using APIDumpHelper internally +class InternalDumpClient: + def __init__(self): + self.msgs = [] + self.is_done = False + def get_messages(self): + return self.msgs + def finalize(self): + self.is_done = True + def is_closed(self): + return self.is_done + def send(self, msg): + self.msgs.append(msg) + if len(self.msgs) >= 10000: + # Avoid filling up memory with too many samples + self.finalize() + # Helper class to store incoming messages in a queue class BulkDataQueue: def __init__(self, mcu, msg_name, oid): diff --git a/klippy/extras/lis2dw.py b/klippy/extras/lis2dw.py index 82673fc8..61d9add7 100644 --- a/klippy/extras/lis2dw.py +++ b/klippy/extras/lis2dw.py @@ -5,7 +5,7 @@ # # This file may be distributed under the terms of the GNU GPLv3 license. import logging -from . import bus, motion_report, adxl345, bulk_sensor +from . import bus, adxl345, bulk_sensor # LIS2DW registers REG_LIS2DW_WHO_AM_I_ADDR = 0x0F @@ -63,7 +63,7 @@ class LIS2DW: BYTES_PER_SAMPLE) self.last_error_count = 0 # API server endpoints - self.api_dump = motion_report.APIDumpHelper( + self.api_dump = bulk_sensor.APIDumpHelper( self.printer, self._api_update, self._api_startstop, API_UPDATES) self.name = config.get_name().split()[-1] wh = self.printer.lookup_object('webhooks') diff --git a/klippy/extras/motion_report.py b/klippy/extras/motion_report.py index d32de43f..f840f516 100644 --- a/klippy/extras/motion_report.py +++ b/klippy/extras/motion_report.py @@ -5,99 +5,7 @@ # This file may be distributed under the terms of the GNU GPLv3 license. import logging import chelper - -API_UPDATE_INTERVAL = 0.500 - -# Helper to periodically transmit data to a set of API clients -class APIDumpHelper: - def __init__(self, printer, data_cb, startstop_cb=None, - update_interval=API_UPDATE_INTERVAL): - self.printer = printer - self.data_cb = data_cb - if startstop_cb is None: - startstop_cb = (lambda is_start: None) - self.startstop_cb = startstop_cb - self.is_started = False - self.update_interval = update_interval - self.update_timer = None - self.clients = {} - def _stop(self): - self.clients.clear() - reactor = self.printer.get_reactor() - reactor.unregister_timer(self.update_timer) - self.update_timer = None - if not self.is_started: - return reactor.NEVER - try: - self.startstop_cb(False) - except self.printer.command_error as e: - logging.exception("API Dump Helper stop callback error") - self.clients.clear() - self.is_started = False - if self.clients: - # New client started while in process of stopping - self._start() - return reactor.NEVER - def _start(self): - if self.is_started: - return - self.is_started = True - try: - self.startstop_cb(True) - except self.printer.command_error as e: - logging.exception("API Dump Helper start callback error") - self.is_started = False - self.clients.clear() - raise - reactor = self.printer.get_reactor() - systime = reactor.monotonic() - waketime = systime + self.update_interval - self.update_timer = reactor.register_timer(self._update, waketime) - def add_client(self, web_request): - cconn = web_request.get_client_connection() - template = web_request.get_dict('response_template', {}) - self.clients[cconn] = template - self._start() - def add_internal_client(self): - cconn = InternalDumpClient() - self.clients[cconn] = {} - self._start() - return cconn - def _update(self, eventtime): - try: - msg = self.data_cb(eventtime) - except self.printer.command_error as e: - logging.exception("API Dump Helper data callback error") - return self._stop() - if not msg: - return eventtime + self.update_interval - for cconn, template in list(self.clients.items()): - if cconn.is_closed(): - del self.clients[cconn] - if not self.clients: - return self._stop() - continue - tmp = dict(template) - tmp['params'] = msg - cconn.send(tmp) - return eventtime + self.update_interval - -# An "internal webhooks" wrapper for using APIDumpHelper internally -class InternalDumpClient: - def __init__(self): - self.msgs = [] - self.is_done = False - def get_messages(self): - return self.msgs - def finalize(self): - self.is_done = True - def is_closed(self): - return self.is_done - def send(self, msg): - self.msgs.append(msg) - if len(self.msgs) >= 10000: - # Avoid filling up memory with too many samples - self.finalize() +from . import bulk_sensor # Extract stepper queue_step messages class DumpStepper: @@ -105,7 +13,7 @@ class DumpStepper: self.printer = printer self.mcu_stepper = mcu_stepper self.last_api_clock = 0 - self.api_dump = APIDumpHelper(printer, self._api_update) + self.api_dump = bulk_sensor.APIDumpHelper(printer, self._api_update) wh = self.printer.lookup_object('webhooks') wh.register_mux_endpoint("motion_report/dump_stepper", "name", mcu_stepper.get_name(), self._add_api_client) @@ -168,7 +76,7 @@ class DumpTrapQ: self.name = name self.trapq = trapq self.last_api_msg = (0., 0.) - self.api_dump = APIDumpHelper(printer, self._api_update) + self.api_dump = bulk_sensor.APIDumpHelper(printer, self._api_update) wh = self.printer.lookup_object('webhooks') wh.register_mux_endpoint("motion_report/dump_trapq", "name", name, self._add_api_client) diff --git a/klippy/extras/mpu9250.py b/klippy/extras/mpu9250.py index d9eb242e..6d5cdcf7 100644 --- a/klippy/extras/mpu9250.py +++ b/klippy/extras/mpu9250.py @@ -5,7 +5,7 @@ # # This file may be distributed under the terms of the GNU GPLv3 license. import logging, time -from . import bus, motion_report, adxl345, bulk_sensor +from . import bus, adxl345, bulk_sensor MPU9250_ADDR = 0x68 @@ -80,7 +80,7 @@ class MPU9250: BYTES_PER_SAMPLE) self.last_error_count = 0 # API server endpoints - self.api_dump = motion_report.APIDumpHelper( + self.api_dump = bulk_sensor.APIDumpHelper( self.printer, self._api_update, self._api_startstop, API_UPDATES) self.name = config.get_name().split()[-1] wh = self.printer.lookup_object('webhooks')