From a6913a982a8f722d58cbfc8b58bd6c8f1557a39a Mon Sep 17 00:00:00 2001 From: Arksine Date: Mon, 9 Nov 2020 20:54:00 -0500 Subject: [PATCH] moonraker: manage subscriptions independently for each connection This allows clients to "unsubscribe"by sending an empty dict. Each client will receive updates only for subscribed objects. Signed-off-by: Eric Callahan --- moonraker/moonraker.py | 66 +++++++++++++++++--------------- moonraker/plugins/klippy_apis.py | 21 +++++++++- moonraker/websockets.py | 24 ++++++++---- 3 files changed, 71 insertions(+), 40 deletions(-) diff --git a/moonraker/moonraker.py b/moonraker/moonraker.py index a332b53..0d700a0 100644 --- a/moonraker/moonraker.py +++ b/moonraker/moonraker.py @@ -51,14 +51,7 @@ class Server: self.init_handle = None self.init_attempts = 0 self.klippy_state = "disconnected" - - # XXX - currently moonraker maintains a superset of all - # subscriptions, the results of which are forwarded to all - # connected websockets. A better implementation would open a - # unique unix domain socket for each websocket client and - # allow Klipper to forward only those subscriptions back to - # correct client. - self.all_subscriptions = {} + self.subscriptions = {} # Server/IOLoop self.server_running = False @@ -221,6 +214,7 @@ class Server: for request in self.pending_requests.values(): request.notify(ServerError("Klippy Disconnected", 503)) self.pending_requests = {} + self.subscriptions = {} logging.info("Klippy Connection Removed") self.send_event("server:klippy_disconnect") if self.init_handle is not None: @@ -236,8 +230,6 @@ class Server: # Subscribe to "webhooks" # Register "webhooks" subscription if "webhooks_sub" not in self.init_list: - temp_subs = self.all_subscriptions - self.all_subscriptions = {} try: await self.klippy_apis.subscribe_objects({'webhooks': None}) except ServerError as e: @@ -245,7 +237,6 @@ class Server: else: logging.info("Webhooks Subscribed") self.init_list.append("webhooks_sub") - self.all_subscriptions.update(temp_subs) # Subscribe to Gcode Output if "gcode_output_sub" not in self.init_list: try: @@ -318,10 +309,6 @@ class Server: logging.info( f"Unable to retreive Klipper Object List") return - # Remove stale objects from the persistent subscription dict - for name in list(self.all_subscriptions.keys()): - if name not in result: - del self.all_subscriptions[name] req_objs = set(["virtual_sdcard", "display_status", "pause_resume"]) missing_objs = req_objs - set(result) if missing_objs: @@ -360,28 +347,42 @@ class Server: logging.info("Klippy has shutdown") self.send_event("server:klippy_shutdown") self.klippy_state = state - self.send_event("server:status_update", status) + for conn, sub in self.subscriptions.items(): + conn_status = {} + for name, fields in sub.items(): + if name in status: + val = status[name] + if fields is None: + conn_status[name] = dict(val) + else: + conn_status[name] = { + k: v for k, v in val.items() if k in fields} + conn.send_status(conn_status) async def make_request(self, web_request): - # XXX - This adds the "response_template" to a subscription - # request and tracks all subscriptions so that each - # client gets what its requesting. In the future we should - # track subscriptions per client and send clients only - # the data they are asking for. rpc_method = web_request.get_endpoint() args = web_request.get_args() if rpc_method == "objects/subscribe": - for obj, items in args.get('objects', {}).items(): - if obj in self.all_subscriptions: - pi = self.all_subscriptions[obj] - if items is None or pi is None: - self.all_subscriptions[obj] = None + sub = args.get('objects', {}) + conn = web_request.get_connection() + if conn is None: + raise self.error( + "No connection associated with subscription request") + self.subscriptions[conn] = sub + all_subs = {} + # request superset of all client subscriptions + for sub in self.subscriptions.values(): + for obj, items in sub.items(): + if obj in all_subs: + pi = all_subs[obj] + if items is None or pi is None: + all_subs[obj] = None + else: + uitems = list(set(pi) | set(items)) + all_subs[obj] = uitems else: - uitems = list(set(pi) | set(items)) - self.all_subscriptions[obj] = uitems - else: - self.all_subscriptions[obj] = items - args['objects'] = dict(self.all_subscriptions) + all_subs[obj] = items + args['objects'] = all_subs args['response_template'] = {'method': "process_status_update"} # Create a base klippy request @@ -392,6 +393,9 @@ class Server: result = await base_request.wait() return result + def remove_subscription(self, conn): + self.subscriptions.pop(conn, None) + async def _stop_server(self): self.server_running = False for name, plugin in self.plugins.items(): diff --git a/moonraker/plugins/klippy_apis.py b/moonraker/plugins/klippy_apis.py index 38106d8..36fcf73 100644 --- a/moonraker/plugins/klippy_apis.py +++ b/moonraker/plugins/klippy_apis.py @@ -23,6 +23,10 @@ class KlippyAPI: def __init__(self, config): self.server = config.get_server() + # Maintain a subscription for all moonraker requests, as + # we do not want to overwrite them + self.host_subscription = {} + # Register GCode Aliases self.server.register_endpoint( "/printer/print/pause", ['POST'], self._gcode_pause) @@ -59,7 +63,7 @@ class KlippyAPI: async def _send_klippy_request(self, method, params, default=Sentinel): try: result = await self.server.make_request( - WebRequest(method, params)) + WebRequest(method, params, conn=self)) except self.server.error as e: if default == Sentinel: raise @@ -119,7 +123,17 @@ class KlippyAPI: return result async def subscribe_objects(self, objects, default=Sentinel): - params = {'objects': objects} + for obj, items in objects.items(): + if obj in self.host_subscription: + prev = self.host_subscription[obj] + if items is None or prev is None: + self.host_subscription[obj] = None + else: + uitems = list(set(prev) | set(items)) + self.host_subscription[obj] = uitems + else: + self.host_subscription[obj] = items + params = {'objects': self.host_subscription} result = await self._send_klippy_request( SUBSCRIPTION_ENDPOINT, params, default) if isinstance(result, dict) and 'status' in result: @@ -138,5 +152,8 @@ class KlippyAPI: {'response_template': {"method": method_name}, 'remote_method': method_name}) + def send_status(self, status): + self.server.send_event("server:status_update", status) + def load_plugin(config): return KlippyAPI(config) diff --git a/moonraker/websockets.py b/moonraker/websockets.py index 77bafc3..c09ddc2 100644 --- a/moonraker/websockets.py +++ b/moonraker/websockets.py @@ -172,8 +172,6 @@ class WebsocketManager: "server:klippy_disconnect", self._handle_klippy_disconnect) self.server.register_event_handler( "server:gcode_response", self._handle_gcode_response) - self.server.register_event_handler( - "server:status_update", self._handle_status_update) self.server.register_event_handler( "file_manager:filelist_changed", self._handle_filelist_changed) self.server.register_event_handler( @@ -187,9 +185,6 @@ class WebsocketManager: async def _handle_gcode_response(self, response): await self.notify_websockets("gcode_response", response) - async def _handle_status_update(self, status): - await self.notify_websockets("status_update", status) - async def _handle_filelist_changed(self, flist): await self.notify_websockets("filelist_changed", flist) @@ -240,17 +235,17 @@ class WebsocketManager: async with self.ws_lock: old_ws = self.websockets.pop(ws.uid, None) if old_ws is not None: + self.server.remove_subscription(old_ws) logging.info(f"Websocket Removed: {ws.uid}") async def notify_websockets(self, name, data=Sentinel): msg = {'jsonrpc': "2.0", 'method': "notify_" + name} if data != Sentinel: msg['params'] = [data] - notification = json.dumps(msg) async with self.ws_lock: for ws in list(self.websockets.values()): try: - ws.write_message(notification) + ws.write_message(msg) except WebSocketClosedError: self.websockets.pop(ws.uid, None) logging.info(f"Websocket Removed: {ws.uid}") @@ -286,6 +281,21 @@ class WebSocket(WebSocketHandler): except Exception: logging.exception("Websocket Command Error") + def send_status(self, status): + if not status: + return + try: + self.write_message({ + 'jsonrpc': "2.0", + 'method': "notify_status_update", + 'params': [status]}) + except WebSocketClosedError: + self.websockets.pop(self.uid, None) + logging.info(f"Websocket Removed: {self.uid}") + except Exception: + logging.exception( + f"Error sending data over websocket: {self.uid}") + def on_close(self): io_loop = IOLoop.current() io_loop.spawn_callback(self.wsm.remove_websocket, self)