From d4316d9878e70ac4f87e59ffda24dbb4bfb4014b Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Fri, 19 Jan 2024 15:08:59 -0500 Subject: [PATCH] spoolman: refactor tracking Use a python dict to act as a queue for reporting used filament per spool. This eliminates the need for locks and resolves potential issues with spool changes when the Spoolman service is not available. In addition, add support for tracking multiple tools Signed-off-by: Eric Callahan --- moonraker/components/spoolman.py | 164 +++++++++++++++---------------- 1 file changed, 79 insertions(+), 85 deletions(-) diff --git a/moonraker/components/spoolman.py b/moonraker/components/spoolman.py index 24f3492..d708ac8 100644 --- a/moonraker/components/spoolman.py +++ b/moonraker/components/spoolman.py @@ -10,7 +10,7 @@ import logging import re import contextlib import tornado.websocket as tornado_ws -from ..common import RequestType, Sentinel +from ..common import RequestType from ..utils import json_wrapper as jsonw from typing import ( TYPE_CHECKING, @@ -32,7 +32,6 @@ if TYPE_CHECKING: DB_NAMESPACE = "moonraker" ACTIVE_SPOOL_KEY = "spoolman.spool_id" -CONNECTION_ERROR_LOG_TIME = 60. class SpoolManager: def __init__(self, config: ConfigHelper): @@ -40,19 +39,18 @@ class SpoolManager: self.eventloop = self.server.get_event_loop() self._get_spoolman_urls(config) self.sync_rate_seconds = config.getint("sync_rate", default=5, minval=1) - self.last_sync_time = 0. - self.extruded_lock = asyncio.Lock() + self.report_timer = self.eventloop.register_timer(self.report_extrusion) + self.pending_reports: Dict[int, float] = {} self.spoolman_ws: Optional[WebSocketClientConnection] = None self.connection_task: Optional[asyncio.Task] = None self.spool_check_task: Optional[asyncio.Task] = None - self.spool_lock = asyncio.Lock() self.ws_connected: bool = False self.reconnect_delay: float = 2. self.is_closing: bool = False self.spool_id: Optional[int] = None - self.extruded: float = 0 self._error_logged: bool = False self._highest_epos: float = 0 + self._current_extruder: str = "extruder" self.klippy_apis: APIComp = self.server.lookup_component("klippy_apis") self.http_client: HttpClient = self.server.lookup_component("http_client") self.database: MoonrakerDatabase = self.server.lookup_component("database") @@ -130,6 +128,7 @@ class SpoolManager: else: self.ws_connected = True self._error_logged = False + self.report_timer.start() logging.info("Connected to Spoolman Spool Manager") if self.spool_id is not None: self._cancel_spool_check_task() @@ -148,6 +147,7 @@ class SpoolManager: if isinstance(message, str): self._decode_message(message) elif message is None: + self.report_timer.stop() self.ws_connected = False cur_time = self.eventloop.get_loop_time() ping_time: float = cur_time - self._last_ping_received @@ -169,7 +169,8 @@ class SpoolManager: if self.spool_id is not None and event.get("type") == "deleted": payload: Dict[str, Any] = event.get("payload", {}) if payload.get("id") == self.spool_id: - self.eventloop.create_task(self.set_active_spool(Sentinel.MISSING)) + self.pending_reports.pop(self.spool_id, None) + self.set_active_spool(None) def _cancel_spool_check_task(self) -> None: if self.spool_check_task is None or self.spool_check_task.done(): @@ -184,7 +185,8 @@ class SpoolManager: ) if response.status_code == 404: logging.info(f"Spool ID {self.spool_id} not found, setting to None") - await self.set_active_spool(Sentinel.MISSING) + self.pending_reports.pop(self.spool_id, None) + self.set_active_spool(None) elif response.has_error(): err_msg = self._get_response_error(response) logging.info(f"Attempt to check spool status failed: {err_msg}") @@ -198,11 +200,14 @@ class SpoolManager: def _on_ws_ping(self, data: bytes = b"") -> None: self._last_ping_received = self.eventloop.get_loop_time() - async def _handle_klippy_ready(self): + async def _handle_klippy_ready(self) -> None: + result: Dict[str, Dict[str, Any]] result = await self.klippy_apis.subscribe_objects( - {"toolhead": ["position"]}, self._handle_status_update, {} + {"toolhead": ["position", "extruder"]}, self._handle_status_update, {} ) - initial_e_pos = self._eposition_from_status(result) + toolhead = result.get("toolhead", {}) + self._current_extruder = toolhead.get("extruder", "extruder") + initial_e_pos = toolhead.get("position", [None]*4)[3] logging.debug(f"Initial epos: {initial_e_pos}") if initial_e_pos is not None: self._highest_epos = initial_e_pos @@ -217,93 +222,81 @@ class SpoolManager: err_msg += f", Spoolman message: {msg}" return err_msg - def _eposition_from_status(self, status: Dict[str, Any]) -> Optional[float]: - position = status.get("toolhead", {}).get("position", []) - return position[3] if len(position) > 3 else None - - async def _handle_status_update(self, status: Dict[str, Any], _: float) -> None: - epos = self._eposition_from_status(status) - if epos and epos > self._highest_epos: - async with self.extruded_lock: - self.extruded += epos - self._highest_epos - self._highest_epos = epos - - now = self.eventloop.get_loop_time() - difference = now - self.last_sync_time - if difference > self.sync_rate_seconds: - self.last_sync_time = now - logging.debug("Sync period elapsed, tracking usage") - await self.track_filament_usage() - - async def set_active_spool(self, spool_id: Union[int, Sentinel, None]) -> None: - async with self.spool_lock: - deleted_spool = False - if spool_id is Sentinel.MISSING: - spool_id = None - deleted_spool = True - if self.spool_id == spool_id: - logging.info(f"Spool ID already set to: {spool_id}") - return - # Store the current spool usage before switching, unless it has been deleted - if not deleted_spool: - if self.spool_id is not None: - await self.track_filament_usage() - elif spool_id is not None: - # No need to track, just reset extrusion - async with self.extruded_lock: - self.extruded = 0 - self.spool_id = spool_id - self.database.insert_item(DB_NAMESPACE, ACTIVE_SPOOL_KEY, spool_id) - self.server.send_event( - "spoolman:active_spool_set", {"spool_id": spool_id} - ) - logging.info(f"Setting active spool to: {spool_id}") - - async def track_filament_usage(self): - spool_id = self.spool_id - if spool_id is None: - logging.debug("No active spool, skipping tracking") + def _handle_status_update(self, status: Dict[str, Any], _: float) -> None: + toolhead: Optional[Dict[str, Any]] = status.get("toolhead") + if toolhead is None: return - async with self.extruded_lock: - if self.extruded > 0 and self.ws_connected: - used_length = self.extruded + epos: float = toolhead.get("position", [0, 0, 0, self._highest_epos])[3] + extr = toolhead.get("extruder", self._current_extruder) + if extr != self._current_extruder: + self._highest_epos = epos + self._current_extruder = extr + elif epos > self._highest_epos: + if self.spool_id is not None: + self._add_extrusion(self.spool_id, epos - self._highest_epos) + self._highest_epos = epos - logging.debug( - f"Sending spool usage: " - f"ID: {spool_id}, " - f"Length: {used_length:.3f}mm, " - ) + def _add_extrusion(self, spool_id: int, used_length: float) -> None: + if spool_id in self.pending_reports: + self.pending_reports[spool_id] += used_length + else: + self.pending_reports[spool_id] = used_length - response = await self.http_client.request( - method="PUT", - url=f"{self.spoolman_url}/v1/spool/{spool_id}/use", - body={ - "use_length": used_length, - }, - ) - if response.has_error(): - if response.status_code == 404: - self._error_logged = False - logging.info( - f"Spool ID {self.spool_id} not found, setting to None" - ) - coro = self.set_active_spool(Sentinel.MISSING) - self.eventloop.create_task(coro) - elif not self._error_logged: + def set_active_spool(self, spool_id: Union[int, None]) -> None: + assert spool_id is None or isinstance(spool_id, int) + if self.spool_id == spool_id: + logging.info(f"Spool ID already set to: {spool_id}") + return + self.spool_id = spool_id + self.database.insert_item(DB_NAMESPACE, ACTIVE_SPOOL_KEY, spool_id) + self.server.send_event( + "spoolman:active_spool_set", {"spool_id": spool_id} + ) + logging.info(f"Setting active spool to: {spool_id}") + + async def report_extrusion(self, eventtime: float) -> float: + if not self.ws_connected: + return eventtime + self.sync_rate_seconds + pending_reports = self.pending_reports + self.pending_reports = {} + for spool_id, used_length in pending_reports.items(): + if not self.ws_connected: + self._add_extrusion(spool_id, used_length) + continue + logging.debug( + f"Sending spool usage: ID: {spool_id}, Length: {used_length:.3f}mm" + ) + response = await self.http_client.request( + method="PUT", + url=f"{self.spoolman_url}/v1/spool/{spool_id}/use", + body={"use_length": used_length} + ) + if response.has_error(): + if response.status_code == 404: + # Since the spool is deleted we can remove any pending reports + # added while waiting for the request + self.pending_reports.pop(spool_id, None) + if spool_id == self.spool_id: + logging.info(f"Spool ID {spool_id} not found, setting to None") + self.set_active_spool(None) + else: + if not self._error_logged: error_msg = self._get_response_error(response) self._error_logged = True logging.info( f"Failed to update extrusion for spool id {spool_id}, " f"received {error_msg}" ) - return - self._error_logged = False - self.extruded = 0 + # Add missed reports back to pending reports for the next cycle + self._add_extrusion(spool_id, used_length) + continue + self._error_logged = False + return self.eventloop.get_loop_time() + self.sync_rate_seconds async def _handle_spool_id_request(self, web_request: WebRequest): if web_request.get_request_type() == RequestType.POST: spool_id = web_request.get_int("spool_id", None) - await self.set_active_spool(spool_id) + self.set_active_spool(spool_id) # For GET requests we will simply return the spool_id return {"spool_id": self.spool_id} @@ -362,6 +355,7 @@ class SpoolManager: async def close(self): self.is_closing = True + self.report_timer.stop() if self.spoolman_ws is not None: self.spoolman_ws.close(1001, "Moonraker Shutdown") self._cancel_spool_check_task()