moonraker: manage subscriptions independently for each connection

This allows clients to "unsubscribe"by sending an empty dict.  Each client will receive updates only for subscribed objects.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2020-11-09 20:54:00 -05:00
parent 8d1239c316
commit a6913a982a
3 changed files with 71 additions and 40 deletions

View File

@ -51,14 +51,7 @@ class Server:
self.init_handle = None self.init_handle = None
self.init_attempts = 0 self.init_attempts = 0
self.klippy_state = "disconnected" self.klippy_state = "disconnected"
self.subscriptions = {}
# XXX - currently moonraker maintains a superset of all
# subscriptions, the results of which are forwarded to all
# connected websockets. A better implementation would open a
# unique unix domain socket for each websocket client and
# allow Klipper to forward only those subscriptions back to
# correct client.
self.all_subscriptions = {}
# Server/IOLoop # Server/IOLoop
self.server_running = False self.server_running = False
@ -221,6 +214,7 @@ class Server:
for request in self.pending_requests.values(): for request in self.pending_requests.values():
request.notify(ServerError("Klippy Disconnected", 503)) request.notify(ServerError("Klippy Disconnected", 503))
self.pending_requests = {} self.pending_requests = {}
self.subscriptions = {}
logging.info("Klippy Connection Removed") logging.info("Klippy Connection Removed")
self.send_event("server:klippy_disconnect") self.send_event("server:klippy_disconnect")
if self.init_handle is not None: if self.init_handle is not None:
@ -236,8 +230,6 @@ class Server:
# Subscribe to "webhooks" # Subscribe to "webhooks"
# Register "webhooks" subscription # Register "webhooks" subscription
if "webhooks_sub" not in self.init_list: if "webhooks_sub" not in self.init_list:
temp_subs = self.all_subscriptions
self.all_subscriptions = {}
try: try:
await self.klippy_apis.subscribe_objects({'webhooks': None}) await self.klippy_apis.subscribe_objects({'webhooks': None})
except ServerError as e: except ServerError as e:
@ -245,7 +237,6 @@ class Server:
else: else:
logging.info("Webhooks Subscribed") logging.info("Webhooks Subscribed")
self.init_list.append("webhooks_sub") self.init_list.append("webhooks_sub")
self.all_subscriptions.update(temp_subs)
# Subscribe to Gcode Output # Subscribe to Gcode Output
if "gcode_output_sub" not in self.init_list: if "gcode_output_sub" not in self.init_list:
try: try:
@ -318,10 +309,6 @@ class Server:
logging.info( logging.info(
f"Unable to retreive Klipper Object List") f"Unable to retreive Klipper Object List")
return return
# Remove stale objects from the persistent subscription dict
for name in list(self.all_subscriptions.keys()):
if name not in result:
del self.all_subscriptions[name]
req_objs = set(["virtual_sdcard", "display_status", "pause_resume"]) req_objs = set(["virtual_sdcard", "display_status", "pause_resume"])
missing_objs = req_objs - set(result) missing_objs = req_objs - set(result)
if missing_objs: if missing_objs:
@ -360,28 +347,42 @@ class Server:
logging.info("Klippy has shutdown") logging.info("Klippy has shutdown")
self.send_event("server:klippy_shutdown") self.send_event("server:klippy_shutdown")
self.klippy_state = state self.klippy_state = state
self.send_event("server:status_update", status) for conn, sub in self.subscriptions.items():
conn_status = {}
for name, fields in sub.items():
if name in status:
val = status[name]
if fields is None:
conn_status[name] = dict(val)
else:
conn_status[name] = {
k: v for k, v in val.items() if k in fields}
conn.send_status(conn_status)
async def make_request(self, web_request): async def make_request(self, web_request):
# XXX - This adds the "response_template" to a subscription
# request and tracks all subscriptions so that each
# client gets what its requesting. In the future we should
# track subscriptions per client and send clients only
# the data they are asking for.
rpc_method = web_request.get_endpoint() rpc_method = web_request.get_endpoint()
args = web_request.get_args() args = web_request.get_args()
if rpc_method == "objects/subscribe": if rpc_method == "objects/subscribe":
for obj, items in args.get('objects', {}).items(): sub = args.get('objects', {})
if obj in self.all_subscriptions: conn = web_request.get_connection()
pi = self.all_subscriptions[obj] if conn is None:
if items is None or pi is None: raise self.error(
self.all_subscriptions[obj] = None "No connection associated with subscription request")
self.subscriptions[conn] = sub
all_subs = {}
# request superset of all client subscriptions
for sub in self.subscriptions.values():
for obj, items in sub.items():
if obj in all_subs:
pi = all_subs[obj]
if items is None or pi is None:
all_subs[obj] = None
else:
uitems = list(set(pi) | set(items))
all_subs[obj] = uitems
else: else:
uitems = list(set(pi) | set(items)) all_subs[obj] = items
self.all_subscriptions[obj] = uitems args['objects'] = all_subs
else:
self.all_subscriptions[obj] = items
args['objects'] = dict(self.all_subscriptions)
args['response_template'] = {'method': "process_status_update"} args['response_template'] = {'method': "process_status_update"}
# Create a base klippy request # Create a base klippy request
@ -392,6 +393,9 @@ class Server:
result = await base_request.wait() result = await base_request.wait()
return result return result
def remove_subscription(self, conn):
self.subscriptions.pop(conn, None)
async def _stop_server(self): async def _stop_server(self):
self.server_running = False self.server_running = False
for name, plugin in self.plugins.items(): for name, plugin in self.plugins.items():

View File

@ -23,6 +23,10 @@ class KlippyAPI:
def __init__(self, config): def __init__(self, config):
self.server = config.get_server() self.server = config.get_server()
# Maintain a subscription for all moonraker requests, as
# we do not want to overwrite them
self.host_subscription = {}
# Register GCode Aliases # Register GCode Aliases
self.server.register_endpoint( self.server.register_endpoint(
"/printer/print/pause", ['POST'], self._gcode_pause) "/printer/print/pause", ['POST'], self._gcode_pause)
@ -59,7 +63,7 @@ class KlippyAPI:
async def _send_klippy_request(self, method, params, default=Sentinel): async def _send_klippy_request(self, method, params, default=Sentinel):
try: try:
result = await self.server.make_request( result = await self.server.make_request(
WebRequest(method, params)) WebRequest(method, params, conn=self))
except self.server.error as e: except self.server.error as e:
if default == Sentinel: if default == Sentinel:
raise raise
@ -119,7 +123,17 @@ class KlippyAPI:
return result return result
async def subscribe_objects(self, objects, default=Sentinel): async def subscribe_objects(self, objects, default=Sentinel):
params = {'objects': objects} for obj, items in objects.items():
if obj in self.host_subscription:
prev = self.host_subscription[obj]
if items is None or prev is None:
self.host_subscription[obj] = None
else:
uitems = list(set(prev) | set(items))
self.host_subscription[obj] = uitems
else:
self.host_subscription[obj] = items
params = {'objects': self.host_subscription}
result = await self._send_klippy_request( result = await self._send_klippy_request(
SUBSCRIPTION_ENDPOINT, params, default) SUBSCRIPTION_ENDPOINT, params, default)
if isinstance(result, dict) and 'status' in result: if isinstance(result, dict) and 'status' in result:
@ -138,5 +152,8 @@ class KlippyAPI:
{'response_template': {"method": method_name}, {'response_template': {"method": method_name},
'remote_method': method_name}) 'remote_method': method_name})
def send_status(self, status):
self.server.send_event("server:status_update", status)
def load_plugin(config): def load_plugin(config):
return KlippyAPI(config) return KlippyAPI(config)

View File

@ -172,8 +172,6 @@ class WebsocketManager:
"server:klippy_disconnect", self._handle_klippy_disconnect) "server:klippy_disconnect", self._handle_klippy_disconnect)
self.server.register_event_handler( self.server.register_event_handler(
"server:gcode_response", self._handle_gcode_response) "server:gcode_response", self._handle_gcode_response)
self.server.register_event_handler(
"server:status_update", self._handle_status_update)
self.server.register_event_handler( self.server.register_event_handler(
"file_manager:filelist_changed", self._handle_filelist_changed) "file_manager:filelist_changed", self._handle_filelist_changed)
self.server.register_event_handler( self.server.register_event_handler(
@ -187,9 +185,6 @@ class WebsocketManager:
async def _handle_gcode_response(self, response): async def _handle_gcode_response(self, response):
await self.notify_websockets("gcode_response", response) await self.notify_websockets("gcode_response", response)
async def _handle_status_update(self, status):
await self.notify_websockets("status_update", status)
async def _handle_filelist_changed(self, flist): async def _handle_filelist_changed(self, flist):
await self.notify_websockets("filelist_changed", flist) await self.notify_websockets("filelist_changed", flist)
@ -240,17 +235,17 @@ class WebsocketManager:
async with self.ws_lock: async with self.ws_lock:
old_ws = self.websockets.pop(ws.uid, None) old_ws = self.websockets.pop(ws.uid, None)
if old_ws is not None: if old_ws is not None:
self.server.remove_subscription(old_ws)
logging.info(f"Websocket Removed: {ws.uid}") logging.info(f"Websocket Removed: {ws.uid}")
async def notify_websockets(self, name, data=Sentinel): async def notify_websockets(self, name, data=Sentinel):
msg = {'jsonrpc': "2.0", 'method': "notify_" + name} msg = {'jsonrpc': "2.0", 'method': "notify_" + name}
if data != Sentinel: if data != Sentinel:
msg['params'] = [data] msg['params'] = [data]
notification = json.dumps(msg)
async with self.ws_lock: async with self.ws_lock:
for ws in list(self.websockets.values()): for ws in list(self.websockets.values()):
try: try:
ws.write_message(notification) ws.write_message(msg)
except WebSocketClosedError: except WebSocketClosedError:
self.websockets.pop(ws.uid, None) self.websockets.pop(ws.uid, None)
logging.info(f"Websocket Removed: {ws.uid}") logging.info(f"Websocket Removed: {ws.uid}")
@ -286,6 +281,21 @@ class WebSocket(WebSocketHandler):
except Exception: except Exception:
logging.exception("Websocket Command Error") logging.exception("Websocket Command Error")
def send_status(self, status):
if not status:
return
try:
self.write_message({
'jsonrpc': "2.0",
'method': "notify_status_update",
'params': [status]})
except WebSocketClosedError:
self.websockets.pop(self.uid, None)
logging.info(f"Websocket Removed: {self.uid}")
except Exception:
logging.exception(
f"Error sending data over websocket: {self.uid}")
def on_close(self): def on_close(self):
io_loop = IOLoop.current() io_loop = IOLoop.current()
io_loop.spawn_callback(self.wsm.remove_websocket, self) io_loop.spawn_callback(self.wsm.remove_websocket, self)