From 7e85ac97c9c32ceb0df45d5d10e3346a899df76d Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Sun, 30 Jul 2023 11:19:21 -0400 Subject: [PATCH] klippy_connection: cache subscription data It is possible that a subscripition request can occur between after Klipper updates a field's status, but before it pushes a status update to the connection. The result is a race condiiton where the response to the subscription request contains the lastest state but it is not propagated to currently connected clients. This is resolved by caching subscripiton data, diffing it with the response to the subscription request, and manually pushing the diff. Signed-off-by: Eric Callahan --- moonraker/klippy_connection.py | 89 ++++++++++++++++++++++++---------- 1 file changed, 64 insertions(+), 25 deletions(-) diff --git a/moonraker/klippy_connection.py b/moonraker/klippy_connection.py index 91cf17c..298ef45 100644 --- a/moonraker/klippy_connection.py +++ b/moonraker/klippy_connection.py @@ -39,6 +39,7 @@ if TYPE_CHECKING: from .components.job_state import JobState from .components.database import MoonrakerDatabase as Database FlexCallback = Callable[..., Optional[Coroutine]] + Subscription = Dict[str, Optional[List[str]]] # These endpoints are reserved for klippy/moonraker communication only and are # not exposed via http or the websocket @@ -77,7 +78,8 @@ class KlippyConnection: self.init_attempts: int = 0 self._state: str = "disconnected" self._state_message: str = "Klippy Disconnected" - self.subscriptions: Dict[Subscribable, Dict[str, Any]] = {} + self.subscriptions: Dict[Subscribable, Subscription] = {} + self.subscription_cache: Dict[str, Dict[str, Any]] = {} # Setup remote methods accessable to Klippy. Note that all # registered remote methods should be of the notification type, # they do not return a response to Klippy after execution @@ -477,10 +479,11 @@ class KlippyConnection: def _process_gcode_response(self, response: str) -> None: self.server.send_event("server:gcode_response", response) - def _process_status_update(self, - eventtime: float, - status: Dict[str, Any] - ) -> None: + def _process_status_update( + self, eventtime: float, status: Dict[str, Dict[str, Any]] + ) -> None: + for field, item in status.items(): + self.subscription_cache.setdefault(field, {}).update(item) if 'webhooks' in status: wh: Dict[str, str] = status['webhooks'] if "state_message" in wh: @@ -488,7 +491,11 @@ class KlippyConnection: # XXX - process other states (startup, ready, error, etc)? if "state" in wh: state = wh["state"] - if state == "shutdown" and not self._klippy_initializing: + if ( + state == "shutdown" and + not self._klippy_initializing and + self._state != "shutdown" + ): # If the shutdown state is received during initialization # defer the event, the init routine will handle it. logging.info("Klippy has shutdown") @@ -519,30 +526,35 @@ class KlippyConnection: "klippy_connection:gcode_received", script) return await self._request_standard(web_request) - async def _request_subscripton(self, - web_request: WebRequest - ) -> Dict[str, Any]: + async def _request_subscripton(self, web_request: WebRequest) -> Dict[str, Any]: async with self.subscription_lock: args = web_request.get_args() conn = web_request.get_subscribable() - - # Build the subscription request from a superset of all client - # subscriptions - sub = args.get('objects', {}) if conn is None: raise self.server.error( - "No connection associated with subscription request") - self.subscriptions[conn] = sub - all_subs: Dict[str, Any] = {} - # request superset of all client subscriptions + "No connection associated with subscription request" + ) + requested_sub: Subscription = args.get('objects', {}) + if self.server.is_verbose_enabled() and "configfile" in requested_sub: + cfg_sub = requested_sub["configfile"] + if ( + cfg_sub is None or "config" in cfg_sub or "settings" in cfg_sub + ): + logging.debug( + f"Detected 'configfile: {cfg_sub}' subscription. The " + "'config' and 'status' fields in this object do not change " + "and substantially increase cache size." + ) + all_subs: Subscription = dict(requested_sub) + # Build the subscription request from a superset of all client subscriptions for sub in self.subscriptions.values(): for obj, items in sub.items(): if obj in all_subs: - pi = all_subs[obj] - if items is None or pi is None: + prev_items = all_subs[obj] + if items is None or prev_items is None: all_subs[obj] = None else: - uitems = list(set(pi) | set(items)) + uitems = list(set(prev_items) | set(items)) all_subs[obj] = uitems else: all_subs[obj] = items @@ -552,19 +564,42 @@ class KlippyConnection: result = await self._request_standard(web_request, 20.0) # prune the status response - pruned_status = {} - all_status: Dict[str, Any] = result['status'] - sub = self.subscriptions.get(conn, {}) + pruned_status: Dict[str, Dict[str, Any]] = {} + status_diff: Dict[str, Dict[str, Any]] = {} + all_status: Dict[str, Dict[str, Any]] = result['status'] for obj, fields in all_status.items(): - if obj in sub: - valid_fields = sub[obj] + # Diff the current cache, then update the cache + if obj in self.subscription_cache: + cached_status = self.subscription_cache[obj] + for field_name, value in fields.items(): + if field_name not in cached_status: + continue + if value != cached_status[field_name]: + status_diff.setdefault(obj, {})[field_name] = value + self.subscription_cache[obj] = fields + # Prune Response + if obj in requested_sub: + valid_fields = requested_sub[obj] if valid_fields is None: pruned_status[obj] = fields else: pruned_status[obj] = { k: v for k, v in fields.items() if k in valid_fields } + if status_diff: + # The response to the status request contains changed data, so it + # is necessary to manually push the status update to existing + # subscribers + logging.debug( + f"Detected status difference during subscription: {status_diff}" + ) + self._process_status_update(result["eventtime"], status_diff) + for obj_name in list(self.subscription_cache.keys()): + # Prune the cache to match the current status response + if obj_name not in all_status: + del self.subscription_cache[obj_name] result['status'] = pruned_status + self.subscriptions[conn] = requested_sub return result async def _request_standard( @@ -597,6 +632,9 @@ class KlippyConnection: stats = job_state.get_last_stats() return stats.get("state", "") == "printing" + def get_subscription_cache(self) -> Dict[str, Dict[str, Any]]: + return self.subscription_cache + async def rollover_log(self) -> None: if "unit_name" not in self._service_info: raise self.server.error( @@ -638,6 +676,7 @@ class KlippyConnection: request.set_exception(ServerError("Klippy Disconnected", 503)) self.pending_requests = {} self.subscriptions = {} + self.subscription_cache.clear() self._peer_cred = {} self._missing_reqs.clear() logging.info("Klippy Connection Removed")