spoolman: refactor tracking

Use a python dict to act as a queue for reporting used filament
per spool.  This eliminates the need for locks and resolves
potential issues with spool changes when the Spoolman
service is not available.

In addition, add support for tracking multiple tools

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2024-01-19 15:08:59 -05:00
parent 045075c396
commit d4316d9878
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
1 changed files with 79 additions and 85 deletions

View File

@ -10,7 +10,7 @@ import logging
import re
import contextlib
import tornado.websocket as tornado_ws
from ..common import RequestType, Sentinel
from ..common import RequestType
from ..utils import json_wrapper as jsonw
from typing import (
TYPE_CHECKING,
@ -32,7 +32,6 @@ if TYPE_CHECKING:
DB_NAMESPACE = "moonraker"
ACTIVE_SPOOL_KEY = "spoolman.spool_id"
CONNECTION_ERROR_LOG_TIME = 60.
class SpoolManager:
def __init__(self, config: ConfigHelper):
@ -40,19 +39,18 @@ class SpoolManager:
self.eventloop = self.server.get_event_loop()
self._get_spoolman_urls(config)
self.sync_rate_seconds = config.getint("sync_rate", default=5, minval=1)
self.last_sync_time = 0.
self.extruded_lock = asyncio.Lock()
self.report_timer = self.eventloop.register_timer(self.report_extrusion)
self.pending_reports: Dict[int, float] = {}
self.spoolman_ws: Optional[WebSocketClientConnection] = None
self.connection_task: Optional[asyncio.Task] = None
self.spool_check_task: Optional[asyncio.Task] = None
self.spool_lock = asyncio.Lock()
self.ws_connected: bool = False
self.reconnect_delay: float = 2.
self.is_closing: bool = False
self.spool_id: Optional[int] = None
self.extruded: float = 0
self._error_logged: bool = False
self._highest_epos: float = 0
self._current_extruder: str = "extruder"
self.klippy_apis: APIComp = self.server.lookup_component("klippy_apis")
self.http_client: HttpClient = self.server.lookup_component("http_client")
self.database: MoonrakerDatabase = self.server.lookup_component("database")
@ -130,6 +128,7 @@ class SpoolManager:
else:
self.ws_connected = True
self._error_logged = False
self.report_timer.start()
logging.info("Connected to Spoolman Spool Manager")
if self.spool_id is not None:
self._cancel_spool_check_task()
@ -148,6 +147,7 @@ class SpoolManager:
if isinstance(message, str):
self._decode_message(message)
elif message is None:
self.report_timer.stop()
self.ws_connected = False
cur_time = self.eventloop.get_loop_time()
ping_time: float = cur_time - self._last_ping_received
@ -169,7 +169,8 @@ class SpoolManager:
if self.spool_id is not None and event.get("type") == "deleted":
payload: Dict[str, Any] = event.get("payload", {})
if payload.get("id") == self.spool_id:
self.eventloop.create_task(self.set_active_spool(Sentinel.MISSING))
self.pending_reports.pop(self.spool_id, None)
self.set_active_spool(None)
def _cancel_spool_check_task(self) -> None:
if self.spool_check_task is None or self.spool_check_task.done():
@ -184,7 +185,8 @@ class SpoolManager:
)
if response.status_code == 404:
logging.info(f"Spool ID {self.spool_id} not found, setting to None")
await self.set_active_spool(Sentinel.MISSING)
self.pending_reports.pop(self.spool_id, None)
self.set_active_spool(None)
elif response.has_error():
err_msg = self._get_response_error(response)
logging.info(f"Attempt to check spool status failed: {err_msg}")
@ -198,11 +200,14 @@ class SpoolManager:
def _on_ws_ping(self, data: bytes = b"") -> None:
self._last_ping_received = self.eventloop.get_loop_time()
async def _handle_klippy_ready(self):
async def _handle_klippy_ready(self) -> None:
result: Dict[str, Dict[str, Any]]
result = await self.klippy_apis.subscribe_objects(
{"toolhead": ["position"]}, self._handle_status_update, {}
{"toolhead": ["position", "extruder"]}, self._handle_status_update, {}
)
initial_e_pos = self._eposition_from_status(result)
toolhead = result.get("toolhead", {})
self._current_extruder = toolhead.get("extruder", "extruder")
initial_e_pos = toolhead.get("position", [None]*4)[3]
logging.debug(f"Initial epos: {initial_e_pos}")
if initial_e_pos is not None:
self._highest_epos = initial_e_pos
@ -217,41 +222,31 @@ class SpoolManager:
err_msg += f", Spoolman message: {msg}"
return err_msg
def _eposition_from_status(self, status: Dict[str, Any]) -> Optional[float]:
position = status.get("toolhead", {}).get("position", [])
return position[3] if len(position) > 3 else None
async def _handle_status_update(self, status: Dict[str, Any], _: float) -> None:
epos = self._eposition_from_status(status)
if epos and epos > self._highest_epos:
async with self.extruded_lock:
self.extruded += epos - self._highest_epos
def _handle_status_update(self, status: Dict[str, Any], _: float) -> None:
toolhead: Optional[Dict[str, Any]] = status.get("toolhead")
if toolhead is None:
return
epos: float = toolhead.get("position", [0, 0, 0, self._highest_epos])[3]
extr = toolhead.get("extruder", self._current_extruder)
if extr != self._current_extruder:
self._highest_epos = epos
self._current_extruder = extr
elif epos > self._highest_epos:
if self.spool_id is not None:
self._add_extrusion(self.spool_id, epos - self._highest_epos)
self._highest_epos = epos
now = self.eventloop.get_loop_time()
difference = now - self.last_sync_time
if difference > self.sync_rate_seconds:
self.last_sync_time = now
logging.debug("Sync period elapsed, tracking usage")
await self.track_filament_usage()
def _add_extrusion(self, spool_id: int, used_length: float) -> None:
if spool_id in self.pending_reports:
self.pending_reports[spool_id] += used_length
else:
self.pending_reports[spool_id] = used_length
async def set_active_spool(self, spool_id: Union[int, Sentinel, None]) -> None:
async with self.spool_lock:
deleted_spool = False
if spool_id is Sentinel.MISSING:
spool_id = None
deleted_spool = True
def set_active_spool(self, spool_id: Union[int, None]) -> None:
assert spool_id is None or isinstance(spool_id, int)
if self.spool_id == spool_id:
logging.info(f"Spool ID already set to: {spool_id}")
return
# Store the current spool usage before switching, unless it has been deleted
if not deleted_spool:
if self.spool_id is not None:
await self.track_filament_usage()
elif spool_id is not None:
# No need to track, just reset extrusion
async with self.extruded_lock:
self.extruded = 0
self.spool_id = spool_id
self.database.insert_item(DB_NAMESPACE, ACTIVE_SPOOL_KEY, spool_id)
self.server.send_event(
@ -259,51 +254,49 @@ class SpoolManager:
)
logging.info(f"Setting active spool to: {spool_id}")
async def track_filament_usage(self):
spool_id = self.spool_id
if spool_id is None:
logging.debug("No active spool, skipping tracking")
return
async with self.extruded_lock:
if self.extruded > 0 and self.ws_connected:
used_length = self.extruded
async def report_extrusion(self, eventtime: float) -> float:
if not self.ws_connected:
return eventtime + self.sync_rate_seconds
pending_reports = self.pending_reports
self.pending_reports = {}
for spool_id, used_length in pending_reports.items():
if not self.ws_connected:
self._add_extrusion(spool_id, used_length)
continue
logging.debug(
f"Sending spool usage: "
f"ID: {spool_id}, "
f"Length: {used_length:.3f}mm, "
f"Sending spool usage: ID: {spool_id}, Length: {used_length:.3f}mm"
)
response = await self.http_client.request(
method="PUT",
url=f"{self.spoolman_url}/v1/spool/{spool_id}/use",
body={
"use_length": used_length,
},
body={"use_length": used_length}
)
if response.has_error():
if response.status_code == 404:
self._error_logged = False
logging.info(
f"Spool ID {self.spool_id} not found, setting to None"
)
coro = self.set_active_spool(Sentinel.MISSING)
self.eventloop.create_task(coro)
elif not self._error_logged:
# Since the spool is deleted we can remove any pending reports
# added while waiting for the request
self.pending_reports.pop(spool_id, None)
if spool_id == self.spool_id:
logging.info(f"Spool ID {spool_id} not found, setting to None")
self.set_active_spool(None)
else:
if not self._error_logged:
error_msg = self._get_response_error(response)
self._error_logged = True
logging.info(
f"Failed to update extrusion for spool id {spool_id}, "
f"received {error_msg}"
)
return
# Add missed reports back to pending reports for the next cycle
self._add_extrusion(spool_id, used_length)
continue
self._error_logged = False
self.extruded = 0
return self.eventloop.get_loop_time() + self.sync_rate_seconds
async def _handle_spool_id_request(self, web_request: WebRequest):
if web_request.get_request_type() == RequestType.POST:
spool_id = web_request.get_int("spool_id", None)
await self.set_active_spool(spool_id)
self.set_active_spool(spool_id)
# For GET requests we will simply return the spool_id
return {"spool_id": self.spool_id}
@ -362,6 +355,7 @@ class SpoolManager:
async def close(self):
self.is_closing = True
self.report_timer.stop()
if self.spoolman_ws is not None:
self.spoolman_ws.close(1001, "Moonraker Shutdown")
self._cancel_spool_check_task()