diff --git a/moonraker/klippy_connection.py b/moonraker/klippy_connection.py index 91cf17c..298ef45 100644 --- a/moonraker/klippy_connection.py +++ b/moonraker/klippy_connection.py @@ -39,6 +39,7 @@ if TYPE_CHECKING: from .components.job_state import JobState from .components.database import MoonrakerDatabase as Database FlexCallback = Callable[..., Optional[Coroutine]] + Subscription = Dict[str, Optional[List[str]]] # These endpoints are reserved for klippy/moonraker communication only and are # not exposed via http or the websocket @@ -77,7 +78,8 @@ class KlippyConnection: self.init_attempts: int = 0 self._state: str = "disconnected" self._state_message: str = "Klippy Disconnected" - self.subscriptions: Dict[Subscribable, Dict[str, Any]] = {} + self.subscriptions: Dict[Subscribable, Subscription] = {} + self.subscription_cache: Dict[str, Dict[str, Any]] = {} # Setup remote methods accessable to Klippy. Note that all # registered remote methods should be of the notification type, # they do not return a response to Klippy after execution @@ -477,10 +479,11 @@ class KlippyConnection: def _process_gcode_response(self, response: str) -> None: self.server.send_event("server:gcode_response", response) - def _process_status_update(self, - eventtime: float, - status: Dict[str, Any] - ) -> None: + def _process_status_update( + self, eventtime: float, status: Dict[str, Dict[str, Any]] + ) -> None: + for field, item in status.items(): + self.subscription_cache.setdefault(field, {}).update(item) if 'webhooks' in status: wh: Dict[str, str] = status['webhooks'] if "state_message" in wh: @@ -488,7 +491,11 @@ class KlippyConnection: # XXX - process other states (startup, ready, error, etc)? if "state" in wh: state = wh["state"] - if state == "shutdown" and not self._klippy_initializing: + if ( + state == "shutdown" and + not self._klippy_initializing and + self._state != "shutdown" + ): # If the shutdown state is received during initialization # defer the event, the init routine will handle it. logging.info("Klippy has shutdown") @@ -519,30 +526,35 @@ class KlippyConnection: "klippy_connection:gcode_received", script) return await self._request_standard(web_request) - async def _request_subscripton(self, - web_request: WebRequest - ) -> Dict[str, Any]: + async def _request_subscripton(self, web_request: WebRequest) -> Dict[str, Any]: async with self.subscription_lock: args = web_request.get_args() conn = web_request.get_subscribable() - - # Build the subscription request from a superset of all client - # subscriptions - sub = args.get('objects', {}) if conn is None: raise self.server.error( - "No connection associated with subscription request") - self.subscriptions[conn] = sub - all_subs: Dict[str, Any] = {} - # request superset of all client subscriptions + "No connection associated with subscription request" + ) + requested_sub: Subscription = args.get('objects', {}) + if self.server.is_verbose_enabled() and "configfile" in requested_sub: + cfg_sub = requested_sub["configfile"] + if ( + cfg_sub is None or "config" in cfg_sub or "settings" in cfg_sub + ): + logging.debug( + f"Detected 'configfile: {cfg_sub}' subscription. The " + "'config' and 'status' fields in this object do not change " + "and substantially increase cache size." + ) + all_subs: Subscription = dict(requested_sub) + # Build the subscription request from a superset of all client subscriptions for sub in self.subscriptions.values(): for obj, items in sub.items(): if obj in all_subs: - pi = all_subs[obj] - if items is None or pi is None: + prev_items = all_subs[obj] + if items is None or prev_items is None: all_subs[obj] = None else: - uitems = list(set(pi) | set(items)) + uitems = list(set(prev_items) | set(items)) all_subs[obj] = uitems else: all_subs[obj] = items @@ -552,19 +564,42 @@ class KlippyConnection: result = await self._request_standard(web_request, 20.0) # prune the status response - pruned_status = {} - all_status: Dict[str, Any] = result['status'] - sub = self.subscriptions.get(conn, {}) + pruned_status: Dict[str, Dict[str, Any]] = {} + status_diff: Dict[str, Dict[str, Any]] = {} + all_status: Dict[str, Dict[str, Any]] = result['status'] for obj, fields in all_status.items(): - if obj in sub: - valid_fields = sub[obj] + # Diff the current cache, then update the cache + if obj in self.subscription_cache: + cached_status = self.subscription_cache[obj] + for field_name, value in fields.items(): + if field_name not in cached_status: + continue + if value != cached_status[field_name]: + status_diff.setdefault(obj, {})[field_name] = value + self.subscription_cache[obj] = fields + # Prune Response + if obj in requested_sub: + valid_fields = requested_sub[obj] if valid_fields is None: pruned_status[obj] = fields else: pruned_status[obj] = { k: v for k, v in fields.items() if k in valid_fields } + if status_diff: + # The response to the status request contains changed data, so it + # is necessary to manually push the status update to existing + # subscribers + logging.debug( + f"Detected status difference during subscription: {status_diff}" + ) + self._process_status_update(result["eventtime"], status_diff) + for obj_name in list(self.subscription_cache.keys()): + # Prune the cache to match the current status response + if obj_name not in all_status: + del self.subscription_cache[obj_name] result['status'] = pruned_status + self.subscriptions[conn] = requested_sub return result async def _request_standard( @@ -597,6 +632,9 @@ class KlippyConnection: stats = job_state.get_last_stats() return stats.get("state", "") == "printing" + def get_subscription_cache(self) -> Dict[str, Dict[str, Any]]: + return self.subscription_cache + async def rollover_log(self) -> None: if "unit_name" not in self._service_info: raise self.server.error( @@ -638,6 +676,7 @@ class KlippyConnection: request.set_exception(ServerError("Klippy Disconnected", 503)) self.pending_requests = {} self.subscriptions = {} + self.subscription_cache.clear() self._peer_cred = {} self._missing_reqs.clear() logging.info("Klippy Connection Removed")