diff --git a/moonraker/moonraker.py b/moonraker/moonraker.py index 7d6def1..ab624da 100644 --- a/moonraker/moonraker.py +++ b/moonraker/moonraker.py @@ -15,7 +15,7 @@ import errno import tornado import tornado.netutil import confighelper -from tornado import gen +from tornado import gen, iostream from tornado.ioloop import IOLoop, PeriodicCallback from tornado.util import TimeoutError from tornado.locks import Event @@ -48,11 +48,9 @@ class Server: socketfile, backlog=1) self.remove_server_sock = tornado.netutil.add_accept_handler( self.klippy_server_sock, self._handle_klippy_connection) - self.klippy_sock = None - self.is_klippy_connected = False + self.klippy_iostream = None self.is_klippy_ready = False self.moonraker_available = False - self.partial_data = b"" # Server/IOLoop self.server_running = False @@ -60,7 +58,7 @@ class Server: self.register_endpoint = app.register_local_handler self.register_static_file_handler = app.register_static_file_handler self.register_upload_handler = app.register_upload_handler - self.io_loop = IOLoop.current() + self.ioloop = IOLoop.current() self.init_cb = PeriodicCallback(self._initialize, INIT_MS) # Setup remote methods accessable to Klippy. Note that all @@ -138,7 +136,7 @@ class Server: def send_event(self, event, *args): events = self.events.get(event, []) for evt in events: - self.io_loop.spawn_callback(evt, *args) + self.ioloop.spawn_callback(evt, *args) def register_remote_method(self, method_name, cb): if method_name in self.remote_methods: @@ -150,42 +148,30 @@ class Server: # ***** Klippy Connection ***** def _handle_klippy_connection(self, conn, addr): - if self.is_klippy_connected: + if self.klippy_iostream is not None and \ + not self.klippy_iostream.closed(): logging.info("New Connection received while Klippy Connected") - self.close_client_sock() + self.klippy_iostream.close() logging.info("Klippy Connection Established") - self.is_klippy_connected = True - conn.setblocking(0) - self.klippy_sock = conn - self.io_loop.add_handler( - self.klippy_sock.fileno(), self._handle_klippy_data, - IOLoop.READ | IOLoop.ERROR) + self.klippy_iostream = iostream.IOStream(conn) + self.klippy_iostream.set_close_callback( + self._handle_stream_closed) + self.ioloop.spawn_callback( + self._read_klippy_stream, self.klippy_iostream) # begin server iniialization self.init_cb.start() - def _handle_klippy_data(self, fd, events): - if events & IOLoop.ERROR: - self.close_client_sock() - return - try: - data = self.klippy_sock.recv(4096) - except socket.error as e: - # If bad file descriptor allow connection to be - # closed by the data check - if e.errno == errno.EBADF: - data = b'' - else: - return - if data == b'': - # Socket Closed - self.close_client_sock() - return - commands = data.split(b'\x03') - commands[0] = self.partial_data + commands[0] - self.partial_data = commands.pop() - for cmd in commands: + async def _read_klippy_stream(self, stream): + while not stream.closed(): try: - decoded_cmd = json.loads(cmd) + data = await stream.read_until(b'\x03') + except iostream.StreamClosedError as e: + return + except Exception: + logging.exception("Klippy Stream Read Error") + continue + try: + decoded_cmd = json.loads(data[:-1]) method = decoded_cmd.get('method') params = decoded_cmd.get('params', {}) cb = self.remote_methods.get(method) @@ -198,32 +184,22 @@ class Server: "Error processing Klippy Host Response: %s" % (cmd.decode())) - def klippy_send(self, data): - # TODO: need a mutex or lock to make sure that multiple co-routines - # do not attempt to send - if not self.is_klippy_connected: - return False - retries = 10 - data = json.dumps(data).encode() + b"\x03" - while data: - try: - sent = self.klippy_sock.send(data) - except socket.error as e: - if e.errno == errno.EBADF or e.errno == errno.EPIPE \ - or not retries: - sent = 0 - else: - # XXX - Should pause for 1ms here - retries -= 1 - continue - retries = 10 - if sent > 0: - data = data[sent:] - else: - logging.info("Error sending client data, closing socket") - self.close_client_sock() - return False - return True + def _handle_stream_closed(self): + self.is_klippy_ready = False + self.moonraker_available = False + self.init_cb.stop() + for request in self.pending_requests.values(): + request.notify(ServerError("Klippy Disconnected", 503)) + self.pending_requests = {} + logging.info("Klippy Connection Removed") + self.send_event("server:klippy_state_changed", "disconnect") + + async def send_klippy_request(self, request): + data = json.dumps(request.to_dict()).encode() + b"\x03" + try: + await self.klippy_iostream.write(data) + except iostream.StreamClosedError: + request.notify(ServerError("Klippy Host not connected", 503)) async def _initialize(self): await self._request_endpoints() @@ -308,11 +284,8 @@ class Server: def make_request(self, path, method, args): base_request = BaseRequest(path, method, args) self.pending_requests[base_request.id] = base_request - ret = self.klippy_send(base_request.to_dict()) - if not ret: - self.pending_requests.pop(base_request.id, None) - base_request.notify( - ServerError("Klippy Host not connected", 503)) + self.ioloop.spawn_callback( + self.send_klippy_request, base_request) return base_request async def _kill_server(self): @@ -324,29 +297,14 @@ class Server: for plugin in self.plugins: if hasattr(plugin, "close"): await plugin.close() - self.close_client_sock() + if self.klippy_iostream is not None and \ + not self.klippy_iostream.closed(): + self.klippy_iostream.close() self.close_server_sock() if self.server_running: self.server_running = False await self.moonraker_app.close() - self.io_loop.stop() - - def close_client_sock(self): - self.is_klippy_ready = False - self.moonraker_available = False - self.init_cb.stop() - for request in self.pending_requests.values(): - request.notify(ServerError("Klippy Disconnected", 503)) - self.pending_requests = {} - if self.is_klippy_connected: - self.is_klippy_connected = False - logging.info("Klippy Connection Removed") - try: - self.io_loop.remove_handler(self.klippy_sock.fileno()) - self.klippy_sock.close() - except socket.error: - logging.exception("Error Closing Client Socket") - self.send_event("server:klippy_state_changed", "disconnect") + self.ioloop.stop() def close_server_sock(self): try: