moonraker: replace calls to ioloop with eventloop wrapper

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2021-07-09 19:59:10 -04:00
parent 07cb64a191
commit d224536552
1 changed files with 45 additions and 46 deletions

View File

@ -19,10 +19,8 @@ import signal
import confighelper import confighelper
import utils import utils
import asyncio import asyncio
from tornado import iostream, gen from tornado import iostream
from tornado.ioloop import IOLoop from eventloop import EventLoop
from tornado.util import TimeoutError
from tornado.locks import Event
from app import MoonrakerApp from app import MoonrakerApp
from utils import ServerError, SentinelClass from utils import ServerError, SentinelClass
@ -61,8 +59,10 @@ class Server:
error = ServerError error = ServerError
def __init__(self, def __init__(self,
args: Dict[str, Any], args: Dict[str, Any],
file_logger: Optional[utils.MoonrakerLoggingHandler] file_logger: Optional[utils.MoonrakerLoggingHandler],
event_loop: EventLoop
) -> None: ) -> None:
self.event_loop = event_loop
self.file_logger = file_logger self.file_logger = file_logger
self.app_args = args self.app_args = args
self.config = config = confighelper.get_configuration(self, args) self.config = config = confighelper.get_configuration(self, args)
@ -86,18 +86,18 @@ class Server:
self.klippy_address: str = config.get( self.klippy_address: str = config.get(
'klippy_uds_address', "/tmp/klippy_uds") 'klippy_uds_address', "/tmp/klippy_uds")
self.klippy_connection = KlippyConnection( self.klippy_connection = KlippyConnection(
self.process_command, self.on_connection_closed) self.process_command, self.on_connection_closed, event_loop)
self.klippy_info: Dict[str, Any] = {} self.klippy_info: Dict[str, Any] = {}
self.init_list: List[str] = [] self.init_list: List[str] = []
self.init_handle: Optional[object] = None self.init_handle: Optional[asyncio.Handle] = None
self.init_attempts: int = 0 self.init_attempts: int = 0
self.klippy_state: str = "disconnected" self.klippy_state: str = "disconnected"
self.klippy_disconnect_evt: Optional[Event] = None self.klippy_disconnect_evt: Optional[asyncio.Event] = None
self.subscriptions: Dict[Subscribable, Dict[str, Any]] = {} self.subscriptions: Dict[Subscribable, Dict[str, Any]] = {}
self.failed_components: List[str] = [] self.failed_components: List[str] = []
self.warnings: List[str] = [] self.warnings: List[str] = []
# Server/IOLoop # Tornado Application/Server
self.server_running: bool = False self.server_running: bool = False
self.moonraker_app = app = MoonrakerApp(config) self.moonraker_app = app = MoonrakerApp(config)
self.register_endpoint = app.register_local_handler self.register_endpoint = app.register_local_handler
@ -105,7 +105,6 @@ class Server:
self.register_upload_handler = app.register_upload_handler self.register_upload_handler = app.register_upload_handler
self.get_websocket_manager = app.get_websocket_manager self.get_websocket_manager = app.get_websocket_manager
self.register_api_transport = app.register_api_transport self.register_api_transport = app.register_api_transport
self.ioloop = IOLoop.current()
self.register_endpoint( self.register_endpoint(
"/server/info", ['GET'], self._handle_info_request) "/server/info", ['GET'], self._handle_info_request)
@ -142,6 +141,9 @@ class Server:
def get_app_args(self) -> Dict[str, Any]: def get_app_args(self) -> Dict[str, Any]:
return dict(self.app_args) return dict(self.app_args)
def get_event_loop(self) -> EventLoop:
return self.event_loop
def start(self) -> None: def start(self) -> None:
hostname, hostport = self.get_host_info() hostname, hostport = self.get_host_info()
logging.info( logging.info(
@ -149,13 +151,9 @@ class Server:
f"Hostname: {hostname}") f"Hostname: {hostname}")
self.moonraker_app.listen(self.host, self.port, self.ssl_port) self.moonraker_app.listen(self.host, self.port, self.ssl_port)
self.server_running = True self.server_running = True
self.ioloop.spawn_callback(self._init_signals) self.event_loop.add_signal_handler(
self.ioloop.spawn_callback(self._connect_klippy)
def _init_signals(self) -> None:
aioloop = asyncio.get_event_loop()
aioloop.add_signal_handler(
signal.SIGTERM, self._handle_term_signal) signal.SIGTERM, self._handle_term_signal)
self.event_loop.register_callback(self._connect_klippy)
def add_log_rollover_item(self, name: str, item: str, def add_log_rollover_item(self, name: str, item: str,
log: bool = True) -> None: log: bool = True) -> None:
@ -238,7 +236,7 @@ class Server:
def send_event(self, event: str, *args) -> None: def send_event(self, event: str, *args) -> None:
events = self.events.get(event, []) events = self.events.get(event, [])
for evt in events: for evt in events:
self.ioloop.spawn_callback(evt, *args) self.event_loop.register_callback(evt, *args)
def register_remote_method(self, def register_remote_method(self,
method_name: str, method_name: str,
@ -270,10 +268,10 @@ class Server:
return return
ret = await self.klippy_connection.connect(self.klippy_address) ret = await self.klippy_connection.connect(self.klippy_address)
if not ret: if not ret:
self.ioloop.call_later(.25, self._connect_klippy) # type: ignore self.event_loop.delay_callback(.25, self._connect_klippy)
return return
# begin server iniialization # begin server iniialization
self.ioloop.spawn_callback(self._initialize) self.event_loop.register_callback(self._initialize)
def process_command(self, cmd: Dict[str, Any]) -> None: def process_command(self, cmd: Dict[str, Any]) -> None:
method = cmd.get('method', None) method = cmd.get('method', None)
@ -281,13 +279,14 @@ class Server:
# This is a remote method called from klippy # This is a remote method called from klippy
if method in self.remote_methods: if method in self.remote_methods:
params = cmd.get('params', {}) params = cmd.get('params', {})
self.ioloop.spawn_callback( self.event_loop.register_callback(
self._execute_method, method, **params) self._execute_method, method, **params)
else: else:
logging.info(f"Unknown method received: {method}") logging.info(f"Unknown method received: {method}")
return return
# This is a response to a request, process # This is a response to a request, process
req_id = cmd.get('id', None) req_id = cmd.get('id', None)
request: Optional[BaseRequest]
request = self.pending_requests.pop(req_id, None) request = self.pending_requests.pop(req_id, None)
if request is None: if request is None:
logging.info( logging.info(
@ -321,9 +320,10 @@ class Server:
logging.info("Klippy Connection Removed") logging.info("Klippy Connection Removed")
self.send_event("server:klippy_disconnect") self.send_event("server:klippy_disconnect")
if self.init_handle is not None: if self.init_handle is not None:
self.ioloop.remove_timeout(self.init_handle) self.init_handle.cancel()
self.init_handle = None
if self.server_running: if self.server_running:
self.ioloop.call_later(.25, self._connect_klippy) # type: ignore self.event_loop.delay_callback(.25, self._connect_klippy)
if self.klippy_disconnect_evt is not None: if self.klippy_disconnect_evt is not None:
self.klippy_disconnect_evt.set() self.klippy_disconnect_evt.set()
@ -360,8 +360,8 @@ class Server:
self.init_handle = None self.init_handle = None
else: else:
self.init_attempts += 1 self.init_attempts += 1
self.init_handle = self.ioloop.call_later( self.init_handle = self.event_loop.delay_callback(
INIT_TIME, self._initialize) # type: ignore INIT_TIME, self._initialize)
async def _request_endpoints(self) -> None: async def _request_endpoints(self) -> None:
result = await self.klippy_apis.list_endpoints(default=None) result = await self.klippy_apis.list_endpoints(default=None)
@ -528,7 +528,7 @@ class Server:
# Create a base klippy request # Create a base klippy request
base_request = BaseRequest(rpc_method, args) base_request = BaseRequest(rpc_method, args)
self.pending_requests[base_request.id] = base_request self.pending_requests[base_request.id] = base_request
self.ioloop.spawn_callback( self.event_loop.register_callback(
self.klippy_connection.send_request, base_request) self.klippy_connection.send_request, base_request)
return await base_request.wait() return await base_request.wait()
@ -537,7 +537,7 @@ class Server:
def _handle_term_signal(self) -> None: def _handle_term_signal(self) -> None:
logging.info(f"Exiting with signal SIGTERM") logging.info(f"Exiting with signal SIGTERM")
self.ioloop.spawn_callback(self._stop_server, "terminate") self.event_loop.register_callback(self._stop_server, "terminate")
async def _stop_server(self, exit_reason: str = "restart") -> None: async def _stop_server(self, exit_reason: str = "restart") -> None:
self.server_running = False self.server_running = False
@ -555,7 +555,7 @@ class Server:
# Sleep for 100ms to allow connected websockets to write out # Sleep for 100ms to allow connected websockets to write out
# remaining data # remaining data
await gen.sleep(.1) await asyncio.sleep(.1)
try: try:
await self.moonraker_app.close() await self.moonraker_app.close()
except Exception: except Exception:
@ -564,10 +564,10 @@ class Server:
# Disconnect from Klippy # Disconnect from Klippy
try: try:
if self.klippy_connection.is_connected(): if self.klippy_connection.is_connected():
self.klippy_disconnect_evt = Event() self.klippy_disconnect_evt = asyncio.Event()
self.klippy_connection.close() self.klippy_connection.close()
timeout = time.time() + 2. await asyncio.wait_for(
await self.klippy_disconnect_evt.wait(timeout) self.klippy_disconnect_evt.wait(), 2.)
self.klippy_disconnect_evt = None self.klippy_disconnect_evt = None
except Exception: except Exception:
logging.exception("Klippy Disconnect Error") logging.exception("Klippy Disconnect Error")
@ -585,12 +585,11 @@ class Server:
f"Error executing 'close()' for component: {name}") f"Error executing 'close()' for component: {name}")
self.exit_reason = exit_reason self.exit_reason = exit_reason
aioloop = asyncio.get_event_loop() self.event_loop.remove_signal_handler(signal.SIGTERM)
aioloop.remove_signal_handler(signal.SIGTERM) self.event_loop.stop()
self.ioloop.stop()
async def _handle_server_restart(self, web_request: WebRequest) -> str: async def _handle_server_restart(self, web_request: WebRequest) -> str:
self.ioloop.spawn_callback(self._stop_server) self.event_loop.register_callback(self._stop_server)
return "ok" return "ok"
async def _handle_info_request(self, async def _handle_info_request(self,
@ -622,12 +621,13 @@ class Server:
class KlippyConnection: class KlippyConnection:
def __init__(self, def __init__(self,
on_recd: Callable[[dict], None], on_recd: Callable[[dict], None],
on_close: Callable[[], None] on_close: Callable[[], None],
event_loop: EventLoop
) -> None: ) -> None:
self.ioloop = IOLoop.current()
self.iostream: Optional[iostream.IOStream] = None self.iostream: Optional[iostream.IOStream] = None
self.on_recd = on_recd self.on_recd = on_recd
self.on_close = on_close self.on_close = on_close
self.event_loop = event_loop
async def connect(self, address: str) -> bool: async def connect(self, address: str) -> bool:
ksock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) ksock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
@ -639,7 +639,7 @@ class KlippyConnection:
logging.info("Klippy Connection Established") logging.info("Klippy Connection Established")
self.iostream = kstream self.iostream = kstream
self.iostream.set_close_callback(self.on_close) self.iostream.set_close_callback(self.on_close)
self.ioloop.spawn_callback(self._read_stream, self.iostream) self.event_loop.register_callback(self._read_stream, self.iostream)
return True return True
async def _read_stream(self, stream: iostream.IOStream) -> None: async def _read_stream(self, stream: iostream.IOStream) -> None:
@ -682,17 +682,16 @@ class BaseRequest:
self.id = id(self) self.id = id(self)
self.rpc_method = rpc_method self.rpc_method = rpc_method
self.params = params self.params = params
self._event = Event() self._event = asyncio.Event()
self.response: Any = None self.response: Any = None
async def wait(self) -> Any: async def wait(self) -> Any:
# Log pending requests every 60 seconds # Log pending requests every 60 seconds
start_time = time.time() start_time = time.time()
while True: while True:
timeout = time.time() + 60.
try: try:
await self._event.wait(timeout=timeout) await asyncio.wait_for(self._event.wait(), 60.)
except TimeoutError: except asyncio.TimeoutError:
pending_time = time.time() - start_time pending_time = time.time() - start_time
logging.info( logging.info(
f"Request '{self.rpc_method}' pending: " f"Request '{self.rpc_method}' pending: "
@ -747,19 +746,19 @@ def main() -> None:
ql.stop() ql.stop()
exit(1) exit(1)
# Start IOLoop and Server # Start asyncio event loop and server
io_loop = IOLoop.current() event_loop = EventLoop()
estatus = 0 estatus = 0
while True: while True:
try: try:
server = Server(app_args, file_logger) server = Server(app_args, file_logger, event_loop)
except Exception: except Exception:
logging.exception("Moonraker Error") logging.exception("Moonraker Error")
estatus = 1 estatus = 1
break break
try: try:
server.start() server.start()
io_loop.start() event_loop.start()
except Exception: except Exception:
logging.exception("Server Running Error") logging.exception("Server Running Error")
estatus = 1 estatus = 1
@ -770,7 +769,7 @@ def main() -> None:
# it is ok to use a blocking sleep here # it is ok to use a blocking sleep here
time.sleep(.5) time.sleep(.5)
logging.info("Attempting Server Restart...") logging.info("Attempting Server Restart...")
io_loop.close(True) event_loop.close()
logging.info("Server Shutdown") logging.info("Server Shutdown")
ql.stop() ql.stop()
exit(estatus) exit(estatus)