diff --git a/moonraker/websockets.py b/moonraker/websockets.py index feeb6c4..9c15f03 100644 --- a/moonraker/websockets.py +++ b/moonraker/websockets.py @@ -8,10 +8,8 @@ from __future__ import annotations import logging import ipaddress import json -import tornado.util -from tornado.ioloop import IOLoop +import asyncio from tornado.websocket import WebSocketHandler, WebSocketClosedError -from tornado.locks import Event from utils import ServerError, SentinelClass # Annotation imports @@ -29,6 +27,7 @@ from typing import ( ) if TYPE_CHECKING: from moonraker import Server + from eventloop import EventLoop from app import APIDefinition, MoonrakerApp import components.authorization _T = TypeVar("_T") @@ -274,7 +273,7 @@ class WebsocketManager(APITransport): self.server = server self.websockets: Dict[int, WebSocket] = {} self.rpc = JsonRPC() - self.closed_event: Optional[Event] = None + self.closed_event: Optional[asyncio.Event] = None self.rpc.register_method("server.websocket.id", self._handle_id_request) @@ -368,12 +367,12 @@ class WebsocketManager(APITransport): async def close(self) -> None: if not self.websockets: return - self.closed_event = Event() + self.closed_event = asyncio.Event() for ws in list(self.websockets.values()): ws.close() try: - await self.closed_event.wait(IOLoop.current().time() + 2.) - except tornado.util.TimeoutError: + await asyncio.wait_for(self.closed_event.wait(), 2.) + except asyncio.TimeoutError: pass self.closed_event = None @@ -381,6 +380,7 @@ class WebSocket(WebSocketHandler, Subscribable): def initialize(self) -> None: app: MoonrakerApp = self.settings['parent'] self.server = app.get_server() + self.event_loop = self.server.get_event_loop() self.wsm = app.get_websocket_manager() self.rpc = self.wsm.rpc self.uid = id(self) @@ -394,8 +394,7 @@ class WebSocket(WebSocketHandler, Subscribable): self.wsm.add_websocket(self) def on_message(self, message: Union[bytes, str]) -> None: - io_loop = IOLoop.current() - io_loop.spawn_callback(self._process_message, message) + self.event_loop.register_callback(self._process_message, message) async def _process_message(self, message: str) -> None: try: @@ -410,7 +409,7 @@ class WebSocket(WebSocketHandler, Subscribable): if self.queue_busy: return self.queue_busy = True - IOLoop.current().spawn_callback(self._process_messages) + self.event_loop.register_callback(self._process_messages) async def _process_messages(self): if self.is_closed: