websockets: replace references to ioloop with event loop

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2021-07-10 08:48:15 -04:00
parent 8a3ff7a54a
commit 4430d564fe
1 changed files with 9 additions and 10 deletions

View File

@ -8,10 +8,8 @@ from __future__ import annotations
import logging import logging
import ipaddress import ipaddress
import json import json
import tornado.util import asyncio
from tornado.ioloop import IOLoop
from tornado.websocket import WebSocketHandler, WebSocketClosedError from tornado.websocket import WebSocketHandler, WebSocketClosedError
from tornado.locks import Event
from utils import ServerError, SentinelClass from utils import ServerError, SentinelClass
# Annotation imports # Annotation imports
@ -29,6 +27,7 @@ from typing import (
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from moonraker import Server from moonraker import Server
from eventloop import EventLoop
from app import APIDefinition, MoonrakerApp from app import APIDefinition, MoonrakerApp
import components.authorization import components.authorization
_T = TypeVar("_T") _T = TypeVar("_T")
@ -274,7 +273,7 @@ class WebsocketManager(APITransport):
self.server = server self.server = server
self.websockets: Dict[int, WebSocket] = {} self.websockets: Dict[int, WebSocket] = {}
self.rpc = JsonRPC() self.rpc = JsonRPC()
self.closed_event: Optional[Event] = None self.closed_event: Optional[asyncio.Event] = None
self.rpc.register_method("server.websocket.id", self._handle_id_request) self.rpc.register_method("server.websocket.id", self._handle_id_request)
@ -368,12 +367,12 @@ class WebsocketManager(APITransport):
async def close(self) -> None: async def close(self) -> None:
if not self.websockets: if not self.websockets:
return return
self.closed_event = Event() self.closed_event = asyncio.Event()
for ws in list(self.websockets.values()): for ws in list(self.websockets.values()):
ws.close() ws.close()
try: try:
await self.closed_event.wait(IOLoop.current().time() + 2.) await asyncio.wait_for(self.closed_event.wait(), 2.)
except tornado.util.TimeoutError: except asyncio.TimeoutError:
pass pass
self.closed_event = None self.closed_event = None
@ -381,6 +380,7 @@ class WebSocket(WebSocketHandler, Subscribable):
def initialize(self) -> None: def initialize(self) -> None:
app: MoonrakerApp = self.settings['parent'] app: MoonrakerApp = self.settings['parent']
self.server = app.get_server() self.server = app.get_server()
self.event_loop = self.server.get_event_loop()
self.wsm = app.get_websocket_manager() self.wsm = app.get_websocket_manager()
self.rpc = self.wsm.rpc self.rpc = self.wsm.rpc
self.uid = id(self) self.uid = id(self)
@ -394,8 +394,7 @@ class WebSocket(WebSocketHandler, Subscribable):
self.wsm.add_websocket(self) self.wsm.add_websocket(self)
def on_message(self, message: Union[bytes, str]) -> None: def on_message(self, message: Union[bytes, str]) -> None:
io_loop = IOLoop.current() self.event_loop.register_callback(self._process_message, message)
io_loop.spawn_callback(self._process_message, message)
async def _process_message(self, message: str) -> None: async def _process_message(self, message: str) -> None:
try: try:
@ -410,7 +409,7 @@ class WebSocket(WebSocketHandler, Subscribable):
if self.queue_busy: if self.queue_busy:
return return
self.queue_busy = True self.queue_busy = True
IOLoop.current().spawn_callback(self._process_messages) self.event_loop.register_callback(self._process_messages)
async def _process_messages(self): async def _process_messages(self):
if self.is_closed: if self.is_closed: