klippy_connection: cache subscription data

It is possible that a subscripition request can occur between
after Klipper updates a field's status, but before it pushes
a status update to the connection.  The result is a race
condiiton where the response to the subscription request
contains the lastest state but it is not propagated to
currently connected clients.

This is resolved by caching subscripiton data, diffing it
with the response to the subscription request, and
manually pushing the diff.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-07-30 11:19:21 -04:00
parent 8ad19e6054
commit 7e85ac97c9
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
1 changed files with 64 additions and 25 deletions

View File

@ -39,6 +39,7 @@ if TYPE_CHECKING:
from .components.job_state import JobState
from .components.database import MoonrakerDatabase as Database
FlexCallback = Callable[..., Optional[Coroutine]]
Subscription = Dict[str, Optional[List[str]]]
# These endpoints are reserved for klippy/moonraker communication only and are
# not exposed via http or the websocket
@ -77,7 +78,8 @@ class KlippyConnection:
self.init_attempts: int = 0
self._state: str = "disconnected"
self._state_message: str = "Klippy Disconnected"
self.subscriptions: Dict[Subscribable, Dict[str, Any]] = {}
self.subscriptions: Dict[Subscribable, Subscription] = {}
self.subscription_cache: Dict[str, Dict[str, Any]] = {}
# Setup remote methods accessable to Klippy. Note that all
# registered remote methods should be of the notification type,
# they do not return a response to Klippy after execution
@ -477,10 +479,11 @@ class KlippyConnection:
def _process_gcode_response(self, response: str) -> None:
self.server.send_event("server:gcode_response", response)
def _process_status_update(self,
eventtime: float,
status: Dict[str, Any]
def _process_status_update(
self, eventtime: float, status: Dict[str, Dict[str, Any]]
) -> None:
for field, item in status.items():
self.subscription_cache.setdefault(field, {}).update(item)
if 'webhooks' in status:
wh: Dict[str, str] = status['webhooks']
if "state_message" in wh:
@ -488,7 +491,11 @@ class KlippyConnection:
# XXX - process other states (startup, ready, error, etc)?
if "state" in wh:
state = wh["state"]
if state == "shutdown" and not self._klippy_initializing:
if (
state == "shutdown" and
not self._klippy_initializing and
self._state != "shutdown"
):
# If the shutdown state is received during initialization
# defer the event, the init routine will handle it.
logging.info("Klippy has shutdown")
@ -519,30 +526,35 @@ class KlippyConnection:
"klippy_connection:gcode_received", script)
return await self._request_standard(web_request)
async def _request_subscripton(self,
web_request: WebRequest
) -> Dict[str, Any]:
async def _request_subscripton(self, web_request: WebRequest) -> Dict[str, Any]:
async with self.subscription_lock:
args = web_request.get_args()
conn = web_request.get_subscribable()
# Build the subscription request from a superset of all client
# subscriptions
sub = args.get('objects', {})
if conn is None:
raise self.server.error(
"No connection associated with subscription request")
self.subscriptions[conn] = sub
all_subs: Dict[str, Any] = {}
# request superset of all client subscriptions
"No connection associated with subscription request"
)
requested_sub: Subscription = args.get('objects', {})
if self.server.is_verbose_enabled() and "configfile" in requested_sub:
cfg_sub = requested_sub["configfile"]
if (
cfg_sub is None or "config" in cfg_sub or "settings" in cfg_sub
):
logging.debug(
f"Detected 'configfile: {cfg_sub}' subscription. The "
"'config' and 'status' fields in this object do not change "
"and substantially increase cache size."
)
all_subs: Subscription = dict(requested_sub)
# Build the subscription request from a superset of all client subscriptions
for sub in self.subscriptions.values():
for obj, items in sub.items():
if obj in all_subs:
pi = all_subs[obj]
if items is None or pi is None:
prev_items = all_subs[obj]
if items is None or prev_items is None:
all_subs[obj] = None
else:
uitems = list(set(pi) | set(items))
uitems = list(set(prev_items) | set(items))
all_subs[obj] = uitems
else:
all_subs[obj] = items
@ -552,19 +564,42 @@ class KlippyConnection:
result = await self._request_standard(web_request, 20.0)
# prune the status response
pruned_status = {}
all_status: Dict[str, Any] = result['status']
sub = self.subscriptions.get(conn, {})
pruned_status: Dict[str, Dict[str, Any]] = {}
status_diff: Dict[str, Dict[str, Any]] = {}
all_status: Dict[str, Dict[str, Any]] = result['status']
for obj, fields in all_status.items():
if obj in sub:
valid_fields = sub[obj]
# Diff the current cache, then update the cache
if obj in self.subscription_cache:
cached_status = self.subscription_cache[obj]
for field_name, value in fields.items():
if field_name not in cached_status:
continue
if value != cached_status[field_name]:
status_diff.setdefault(obj, {})[field_name] = value
self.subscription_cache[obj] = fields
# Prune Response
if obj in requested_sub:
valid_fields = requested_sub[obj]
if valid_fields is None:
pruned_status[obj] = fields
else:
pruned_status[obj] = {
k: v for k, v in fields.items() if k in valid_fields
}
if status_diff:
# The response to the status request contains changed data, so it
# is necessary to manually push the status update to existing
# subscribers
logging.debug(
f"Detected status difference during subscription: {status_diff}"
)
self._process_status_update(result["eventtime"], status_diff)
for obj_name in list(self.subscription_cache.keys()):
# Prune the cache to match the current status response
if obj_name not in all_status:
del self.subscription_cache[obj_name]
result['status'] = pruned_status
self.subscriptions[conn] = requested_sub
return result
async def _request_standard(
@ -597,6 +632,9 @@ class KlippyConnection:
stats = job_state.get_last_stats()
return stats.get("state", "") == "printing"
def get_subscription_cache(self) -> Dict[str, Dict[str, Any]]:
return self.subscription_cache
async def rollover_log(self) -> None:
if "unit_name" not in self._service_info:
raise self.server.error(
@ -638,6 +676,7 @@ class KlippyConnection:
request.set_exception(ServerError("Klippy Disconnected", 503))
self.pending_requests = {}
self.subscriptions = {}
self.subscription_cache.clear()
self._peer_cred = {}
self._missing_reqs.clear()
logging.info("Klippy Connection Removed")