websockets: guarantee subscription removal

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-02-03 17:54:01 -05:00
parent 1be19be747
commit a413f4e4cd
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
2 changed files with 7 additions and 3 deletions

View File

@ -25,6 +25,7 @@ if TYPE_CHECKING:
from moonraker import Server from moonraker import Server
from confighelper import ConfigHelper from confighelper import ConfigHelper
from websockets import WebRequest from websockets import WebRequest
from klippy_connection import KlippyConnection as Klippy
UNIX_BUFFER_LIMIT = 20 * 1024 * 1024 UNIX_BUFFER_LIMIT = 20 * 1024 * 1024
@ -202,6 +203,8 @@ class UnixSocketClient(BaseSocketClient):
if self.is_closed: if self.is_closed:
return return
self.is_closed = True self.is_closed = True
kconn: Klippy = self.server.lookup_component("klippy_connection")
kconn.remove_subscription(self)
if not self.writer.is_closing(): if not self.writer.is_closing():
self.writer.close() self.writer.close()
try: try:

View File

@ -349,7 +349,6 @@ class APITransport:
class WebsocketManager(APITransport): class WebsocketManager(APITransport):
def __init__(self, server: Server) -> None: def __init__(self, server: Server) -> None:
self.server = server self.server = server
self.klippy: Klippy = server.lookup_component("klippy_connection")
self.clients: Dict[int, BaseSocketClient] = {} self.clients: Dict[int, BaseSocketClient] = {}
self.rpc = JsonRPC(server) self.rpc = JsonRPC(server)
self.closed_event: Optional[asyncio.Event] = None self.closed_event: Optional[asyncio.Event] = None
@ -376,11 +375,12 @@ class WebsocketManager(APITransport):
self.server.register_event_handler(event_name, notify_handler) self.server.register_event_handler(event_name, notify_handler)
def register_api_handler(self, api_def: APIDefinition) -> None: def register_api_handler(self, api_def: APIDefinition) -> None:
klippy: Klippy = self.server.lookup_component("klippy_connection")
if api_def.callback is None: if api_def.callback is None:
# Remote API, uses RPC to reach out to Klippy # Remote API, uses RPC to reach out to Klippy
ws_method = api_def.jrpc_methods[0] ws_method = api_def.jrpc_methods[0]
rpc_cb = self._generate_callback( rpc_cb = self._generate_callback(
api_def.endpoint, "", self.klippy.request api_def.endpoint, "", klippy.request
) )
self.rpc.register_method(ws_method, rpc_cb) self.rpc.register_method(ws_method, rpc_cb)
else: else:
@ -514,7 +514,6 @@ class WebsocketManager(APITransport):
def remove_client(self, sc: BaseSocketClient) -> None: def remove_client(self, sc: BaseSocketClient) -> None:
old_sc = self.clients.pop(sc.uid, None) old_sc = self.clients.pop(sc.uid, None)
if old_sc is not None: if old_sc is not None:
self.klippy.remove_subscription(old_sc)
self.server.send_event("websockets:client_removed", sc) self.server.send_event("websockets:client_removed", sc)
logging.debug(f"Websocket Removed: {sc.uid}") logging.debug(f"Websocket Removed: {sc.uid}")
if self.closed_event is not None and not self.clients: if self.closed_event is not None and not self.clients:
@ -759,6 +758,8 @@ class WebSocket(WebSocketHandler, BaseSocketClient):
def on_close(self) -> None: def on_close(self) -> None:
self.is_closed = True self.is_closed = True
self.__class__.connection_count -= 1 self.__class__.connection_count -= 1
kconn: Klippy = self.server.lookup_component("klippy_connection")
kconn.remove_subscription(self)
self.message_buf = [] self.message_buf = []
now = self.eventloop.get_loop_time() now = self.eventloop.get_loop_time()
pong_elapsed = now - self.last_pong_time pong_elapsed = now - self.last_pong_time