From c716edafe291a3d32700becfb67cb1504cd6902b Mon Sep 17 00:00:00 2001 From: Kevin O'Connor Date: Sun, 17 Dec 2023 00:15:55 -0500 Subject: [PATCH] bulk_sensor: Simplify the registration of internal clients in BatchBulkHelper Previously, the BatchBulkHelper class was designed primarily to register webhook clients, and internal clients used a wrapper class that emulated a webhooks client. Change BatchBulkHelper to support regular internal callbacks, and introduce a new BatchWebhooksClient class that can translate these internal callback to webhooks client messages. This makes it easier to register internal clients that can process the bulk messages every batch interval. Signed-off-by: Kevin O'Connor --- klippy/extras/adxl345.py | 43 ++++++++++++----------- klippy/extras/angle.py | 19 +++++++---- klippy/extras/bulk_sensor.py | 66 +++++++++++++++--------------------- klippy/extras/lis2dw.py | 5 +-- klippy/extras/mpu9250.py | 5 +-- 5 files changed, 69 insertions(+), 69 deletions(-) diff --git a/klippy/extras/adxl345.py b/klippy/extras/adxl345.py index 1dfb6bc7..8f40c7fe 100644 --- a/klippy/extras/adxl345.py +++ b/klippy/extras/adxl345.py @@ -32,26 +32,29 @@ Accel_Measurement = collections.namedtuple( # Helper class to obtain measurements class AccelQueryHelper: - def __init__(self, printer, cconn): + def __init__(self, printer): self.printer = printer - self.cconn = cconn + self.is_finished = False print_time = printer.lookup_object('toolhead').get_last_move_time() self.request_start_time = self.request_end_time = print_time - self.samples = self.raw_samples = [] + self.msgs = [] + self.samples = [] def finish_measurements(self): toolhead = self.printer.lookup_object('toolhead') self.request_end_time = toolhead.get_last_move_time() toolhead.wait_moves() - self.cconn.finalize() - def _get_raw_samples(self): - raw_samples = self.cconn.get_messages() - if raw_samples: - self.raw_samples = raw_samples - return self.raw_samples + self.is_finished = True + def handle_batch(self, msg): + if self.is_finished: + return False + if len(self.msgs) >= 10000: + # Avoid filling up memory with too many samples + return False + self.msgs.append(msg) + return True def has_valid_samples(self): - raw_samples = self._get_raw_samples() - for msg in raw_samples: - data = msg['params']['data'] + for msg in self.msgs: + data = msg['data'] first_sample_time = data[0][0] last_sample_time = data[-1][0] if (first_sample_time > self.request_end_time @@ -60,21 +63,20 @@ class AccelQueryHelper: # The time intervals [first_sample_time, last_sample_time] # and [request_start_time, request_end_time] have non-zero # intersection. It is still theoretically possible that none - # of the samples from raw_samples fall into the time interval + # of the samples from msgs fall into the time interval # [request_start_time, request_end_time] if it is too narrow # or on very heavy data losses. In practice, that interval # is at least 1 second, so this possibility is negligible. return True return False def get_samples(self): - raw_samples = self._get_raw_samples() - if not raw_samples: + if not self.msgs: return self.samples - total = sum([len(m['params']['data']) for m in raw_samples]) + total = sum([len(m['data']) for m in self.msgs]) count = 0 self.samples = samples = [None] * total - for msg in raw_samples: - for samp_time, x, y, z in msg['params']['data']: + for msg in self.msgs: + for samp_time, x, y, z in msg['data']: if samp_time < self.request_start_time: continue if samp_time > self.request_end_time: @@ -250,8 +252,9 @@ class ADXL345: "(e.g. faulty wiring) or a faulty adxl345 chip." % ( reg, val, stored_val)) def start_internal_client(self): - cconn = self.batch_bulk.add_internal_client() - return AccelQueryHelper(self.printer, cconn) + aqh = AccelQueryHelper(self.printer) + self.batch_bulk.add_client(aqh.handle_batch) + return aqh # Measurement decoding def _extract_samples(self, raw_samples): # Load variables to optimize inner loop below diff --git a/klippy/extras/angle.py b/klippy/extras/angle.py index 0fe053df..b1aa0d96 100644 --- a/klippy/extras/angle.py +++ b/klippy/extras/angle.py @@ -157,8 +157,14 @@ class AngleCalibration: def do_calibration_moves(self): move = self.printer.lookup_object('force_move').manual_move # Start data collection - angle_sensor = self.printer.lookup_object(self.name) - cconn = angle_sensor.start_internal_client() + msgs = [] + is_finished = False + def handle_batch(msg): + if is_finished: + return False + msgs.append(msg) + return True + self.printer.lookup_object(self.name).add_client(handle_batch) # Move stepper several turns (to allow internal sensor calibration) microsteps, full_steps = self.get_microsteps() mcu_stepper = self.mcu_stepper @@ -190,13 +196,12 @@ class AngleCalibration: move(mcu_stepper, .5*rotation_dist + align_dist, move_speed) toolhead.wait_moves() # Finish data collection - cconn.finalize() - msgs = cconn.get_messages() + is_finished = True # Correlate query responses cal = {} step = 0 for msg in msgs: - for query_time, pos in msg['params']['data']: + for query_time, pos in msg['data']: # Add to step tracking while step < len(times) and query_time > times[step][1]: step += 1 @@ -462,8 +467,8 @@ class Angle: "spi_angle_end oid=%c sequence=%hu", oid=self.oid, cq=cmdqueue) def get_status(self, eventtime=None): return {'temperature': self.sensor_helper.last_temperature} - def start_internal_client(self): - return self.batch_bulk.add_internal_client() + def add_client(self, client_cb): + self.batch_bulk.add_client(client_cb) # Measurement decoding def _extract_samples(self, raw_samples): # Load variables to optimize inner loop below diff --git a/klippy/extras/bulk_sensor.py b/klippy/extras/bulk_sensor.py index 95f6201e..a8497afc 100644 --- a/klippy/extras/bulk_sensor.py +++ b/klippy/extras/bulk_sensor.py @@ -22,7 +22,7 @@ class BatchBulkHelper: self.is_started = False self.batch_interval = batch_interval self.batch_timer = None - self.clients = {} + self.client_cbs = [] self.webhooks_start_resp = {} # Periodic batch processing def _start(self): @@ -34,14 +34,14 @@ class BatchBulkHelper: except self.printer.command_error as e: logging.exception("BatchBulkHelper start callback error") self.is_started = False - self.clients.clear() + del self.client_cbs[:] raise reactor = self.printer.get_reactor() systime = reactor.monotonic() waketime = systime + self.batch_interval self.batch_timer = reactor.register_timer(self._proc_batch, waketime) def _stop(self): - self.clients.clear() + del self.client_cbs[:] self.printer.get_reactor().unregister_timer(self.batch_timer) self.batch_timer = None if not self.is_started: @@ -50,9 +50,9 @@ class BatchBulkHelper: self.stop_cb() except self.printer.command_error as e: logging.exception("BatchBulkHelper stop callback error") - self.clients.clear() + del self.client_cbs[:] self.is_started = False - if self.clients: + if self.client_cbs: # New client started while in process of stopping self._start() def _proc_batch(self, eventtime): @@ -64,51 +64,41 @@ class BatchBulkHelper: return self.printer.get_reactor().NEVER if not msg: return eventtime + self.batch_interval - for cconn, template in list(self.clients.items()): - if cconn.is_closed(): - del self.clients[cconn] - if not self.clients: + for client_cb in list(self.client_cbs): + res = client_cb(msg) + if not res: + # This client no longer needs updates - unregister it + self.client_cbs.remove(client_cb) + if not self.client_cbs: self._stop() return self.printer.get_reactor().NEVER - continue - tmp = dict(template) - tmp['params'] = msg - cconn.send(tmp) return eventtime + self.batch_interval - # Internal clients - def add_internal_client(self): - cconn = InternalDumpClient() - self.clients[cconn] = {} + # Client registration + def add_client(self, client_cb): + self.client_cbs.append(client_cb) self._start() - return cconn # Webhooks registration def _add_api_client(self, web_request): - cconn = web_request.get_client_connection() - template = web_request.get_dict('response_template', {}) - self.clients[cconn] = template - self._start() + whbatch = BatchWebhooksClient(web_request) + self.add_client(whbatch.handle_batch) web_request.send(self.webhooks_start_resp) def add_mux_endpoint(self, path, key, value, webhooks_start_resp): self.webhooks_start_resp = webhooks_start_resp wh = self.printer.lookup_object('webhooks') wh.register_mux_endpoint(path, key, value, self._add_api_client) -# An "internal webhooks" wrapper for using BatchBulkHelper internally -class InternalDumpClient: - def __init__(self): - self.msgs = [] - self.is_done = False - def get_messages(self): - return self.msgs - def finalize(self): - self.is_done = True - def is_closed(self): - return self.is_done - def send(self, msg): - self.msgs.append(msg) - if len(self.msgs) >= 10000: - # Avoid filling up memory with too many samples - self.finalize() +# A webhooks wrapper for use by BatchBulkHelper +class BatchWebhooksClient: + def __init__(self, web_request): + self.cconn = web_request.get_client_connection() + self.template = web_request.get_dict('response_template', {}) + def handle_batch(self, msg): + if self.cconn.is_closed(): + return False + tmp = dict(self.template) + tmp['params'] = msg + self.cconn.send(tmp) + return True # Helper class to store incoming messages in a queue class BulkDataQueue: diff --git a/klippy/extras/lis2dw.py b/klippy/extras/lis2dw.py index 739c3641..28591c21 100644 --- a/klippy/extras/lis2dw.py +++ b/klippy/extras/lis2dw.py @@ -97,8 +97,9 @@ class LIS2DW: "(e.g. faulty wiring) or a faulty lis2dw chip." % ( reg, val, stored_val)) def start_internal_client(self): - cconn = self.bulk_batch.add_internal_client() - return adxl345.AccelQueryHelper(self.printer, cconn) + aqh = adxl345.AccelQueryHelper(self.printer) + self.batch_bulk.add_client(aqh.handle_batch) + return aqh # Measurement decoding def _extract_samples(self, raw_samples): # Load variables to optimize inner loop below diff --git a/klippy/extras/mpu9250.py b/klippy/extras/mpu9250.py index 82438ca0..c975f989 100644 --- a/klippy/extras/mpu9250.py +++ b/klippy/extras/mpu9250.py @@ -109,8 +109,9 @@ class MPU9250: def set_reg(self, reg, val, minclock=0): self.i2c.i2c_write([reg, val & 0xFF], minclock=minclock) def start_internal_client(self): - cconn = self.batch_bulk.add_internal_client() - return adxl345.AccelQueryHelper(self.printer, cconn) + aqh = adxl345.AccelQueryHelper(self.printer) + self.batch_bulk.add_client(aqh.handle_batch) + return aqh # Measurement decoding def _extract_samples(self, raw_samples): # Load variables to optimize inner loop below