From 62e5f85473d78dd3245ec3c29a21af8d4999cbc7 Mon Sep 17 00:00:00 2001 From: Arksine Date: Fri, 7 Aug 2020 17:27:01 -0400 Subject: [PATCH] moonraker: use tornado's iostream wrapper for klippy connection This reduces the amount of code needed to handle the connection and adds more robust error handling. This also prepares moonraker for the eventual transition of Klippy hosting the server socket. Signed-off-by: Eric Callahan --- moonraker/moonraker.py | 130 ++++++++++++++--------------------------- 1 file changed, 44 insertions(+), 86 deletions(-) diff --git a/moonraker/moonraker.py b/moonraker/moonraker.py index 7d6def1..ab624da 100644 --- a/moonraker/moonraker.py +++ b/moonraker/moonraker.py @@ -15,7 +15,7 @@ import errno import tornado import tornado.netutil import confighelper -from tornado import gen +from tornado import gen, iostream from tornado.ioloop import IOLoop, PeriodicCallback from tornado.util import TimeoutError from tornado.locks import Event @@ -48,11 +48,9 @@ class Server: socketfile, backlog=1) self.remove_server_sock = tornado.netutil.add_accept_handler( self.klippy_server_sock, self._handle_klippy_connection) - self.klippy_sock = None - self.is_klippy_connected = False + self.klippy_iostream = None self.is_klippy_ready = False self.moonraker_available = False - self.partial_data = b"" # Server/IOLoop self.server_running = False @@ -60,7 +58,7 @@ class Server: self.register_endpoint = app.register_local_handler self.register_static_file_handler = app.register_static_file_handler self.register_upload_handler = app.register_upload_handler - self.io_loop = IOLoop.current() + self.ioloop = IOLoop.current() self.init_cb = PeriodicCallback(self._initialize, INIT_MS) # Setup remote methods accessable to Klippy. Note that all @@ -138,7 +136,7 @@ class Server: def send_event(self, event, *args): events = self.events.get(event, []) for evt in events: - self.io_loop.spawn_callback(evt, *args) + self.ioloop.spawn_callback(evt, *args) def register_remote_method(self, method_name, cb): if method_name in self.remote_methods: @@ -150,42 +148,30 @@ class Server: # ***** Klippy Connection ***** def _handle_klippy_connection(self, conn, addr): - if self.is_klippy_connected: + if self.klippy_iostream is not None and \ + not self.klippy_iostream.closed(): logging.info("New Connection received while Klippy Connected") - self.close_client_sock() + self.klippy_iostream.close() logging.info("Klippy Connection Established") - self.is_klippy_connected = True - conn.setblocking(0) - self.klippy_sock = conn - self.io_loop.add_handler( - self.klippy_sock.fileno(), self._handle_klippy_data, - IOLoop.READ | IOLoop.ERROR) + self.klippy_iostream = iostream.IOStream(conn) + self.klippy_iostream.set_close_callback( + self._handle_stream_closed) + self.ioloop.spawn_callback( + self._read_klippy_stream, self.klippy_iostream) # begin server iniialization self.init_cb.start() - def _handle_klippy_data(self, fd, events): - if events & IOLoop.ERROR: - self.close_client_sock() - return - try: - data = self.klippy_sock.recv(4096) - except socket.error as e: - # If bad file descriptor allow connection to be - # closed by the data check - if e.errno == errno.EBADF: - data = b'' - else: - return - if data == b'': - # Socket Closed - self.close_client_sock() - return - commands = data.split(b'\x03') - commands[0] = self.partial_data + commands[0] - self.partial_data = commands.pop() - for cmd in commands: + async def _read_klippy_stream(self, stream): + while not stream.closed(): try: - decoded_cmd = json.loads(cmd) + data = await stream.read_until(b'\x03') + except iostream.StreamClosedError as e: + return + except Exception: + logging.exception("Klippy Stream Read Error") + continue + try: + decoded_cmd = json.loads(data[:-1]) method = decoded_cmd.get('method') params = decoded_cmd.get('params', {}) cb = self.remote_methods.get(method) @@ -198,32 +184,22 @@ class Server: "Error processing Klippy Host Response: %s" % (cmd.decode())) - def klippy_send(self, data): - # TODO: need a mutex or lock to make sure that multiple co-routines - # do not attempt to send - if not self.is_klippy_connected: - return False - retries = 10 - data = json.dumps(data).encode() + b"\x03" - while data: - try: - sent = self.klippy_sock.send(data) - except socket.error as e: - if e.errno == errno.EBADF or e.errno == errno.EPIPE \ - or not retries: - sent = 0 - else: - # XXX - Should pause for 1ms here - retries -= 1 - continue - retries = 10 - if sent > 0: - data = data[sent:] - else: - logging.info("Error sending client data, closing socket") - self.close_client_sock() - return False - return True + def _handle_stream_closed(self): + self.is_klippy_ready = False + self.moonraker_available = False + self.init_cb.stop() + for request in self.pending_requests.values(): + request.notify(ServerError("Klippy Disconnected", 503)) + self.pending_requests = {} + logging.info("Klippy Connection Removed") + self.send_event("server:klippy_state_changed", "disconnect") + + async def send_klippy_request(self, request): + data = json.dumps(request.to_dict()).encode() + b"\x03" + try: + await self.klippy_iostream.write(data) + except iostream.StreamClosedError: + request.notify(ServerError("Klippy Host not connected", 503)) async def _initialize(self): await self._request_endpoints() @@ -308,11 +284,8 @@ class Server: def make_request(self, path, method, args): base_request = BaseRequest(path, method, args) self.pending_requests[base_request.id] = base_request - ret = self.klippy_send(base_request.to_dict()) - if not ret: - self.pending_requests.pop(base_request.id, None) - base_request.notify( - ServerError("Klippy Host not connected", 503)) + self.ioloop.spawn_callback( + self.send_klippy_request, base_request) return base_request async def _kill_server(self): @@ -324,29 +297,14 @@ class Server: for plugin in self.plugins: if hasattr(plugin, "close"): await plugin.close() - self.close_client_sock() + if self.klippy_iostream is not None and \ + not self.klippy_iostream.closed(): + self.klippy_iostream.close() self.close_server_sock() if self.server_running: self.server_running = False await self.moonraker_app.close() - self.io_loop.stop() - - def close_client_sock(self): - self.is_klippy_ready = False - self.moonraker_available = False - self.init_cb.stop() - for request in self.pending_requests.values(): - request.notify(ServerError("Klippy Disconnected", 503)) - self.pending_requests = {} - if self.is_klippy_connected: - self.is_klippy_connected = False - logging.info("Klippy Connection Removed") - try: - self.io_loop.remove_handler(self.klippy_sock.fileno()) - self.klippy_sock.close() - except socket.error: - logging.exception("Error Closing Client Socket") - self.send_event("server:klippy_state_changed", "disconnect") + self.ioloop.stop() def close_server_sock(self): try: