moonraker: use tornado's iostream wrapper for klippy connection

This reduces the amount of code needed to handle the connection and adds more robust error handling.   This also prepares moonraker for the eventual transition of Klippy hosting the server socket.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2020-08-07 17:27:01 -04:00
parent b92000dd46
commit 62e5f85473
1 changed files with 44 additions and 86 deletions

View File

@ -15,7 +15,7 @@ import errno
import tornado import tornado
import tornado.netutil import tornado.netutil
import confighelper import confighelper
from tornado import gen from tornado import gen, iostream
from tornado.ioloop import IOLoop, PeriodicCallback from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.util import TimeoutError from tornado.util import TimeoutError
from tornado.locks import Event from tornado.locks import Event
@ -48,11 +48,9 @@ class Server:
socketfile, backlog=1) socketfile, backlog=1)
self.remove_server_sock = tornado.netutil.add_accept_handler( self.remove_server_sock = tornado.netutil.add_accept_handler(
self.klippy_server_sock, self._handle_klippy_connection) self.klippy_server_sock, self._handle_klippy_connection)
self.klippy_sock = None self.klippy_iostream = None
self.is_klippy_connected = False
self.is_klippy_ready = False self.is_klippy_ready = False
self.moonraker_available = False self.moonraker_available = False
self.partial_data = b""
# Server/IOLoop # Server/IOLoop
self.server_running = False self.server_running = False
@ -60,7 +58,7 @@ class Server:
self.register_endpoint = app.register_local_handler self.register_endpoint = app.register_local_handler
self.register_static_file_handler = app.register_static_file_handler self.register_static_file_handler = app.register_static_file_handler
self.register_upload_handler = app.register_upload_handler self.register_upload_handler = app.register_upload_handler
self.io_loop = IOLoop.current() self.ioloop = IOLoop.current()
self.init_cb = PeriodicCallback(self._initialize, INIT_MS) self.init_cb = PeriodicCallback(self._initialize, INIT_MS)
# Setup remote methods accessable to Klippy. Note that all # Setup remote methods accessable to Klippy. Note that all
@ -138,7 +136,7 @@ class Server:
def send_event(self, event, *args): def send_event(self, event, *args):
events = self.events.get(event, []) events = self.events.get(event, [])
for evt in events: for evt in events:
self.io_loop.spawn_callback(evt, *args) self.ioloop.spawn_callback(evt, *args)
def register_remote_method(self, method_name, cb): def register_remote_method(self, method_name, cb):
if method_name in self.remote_methods: if method_name in self.remote_methods:
@ -150,42 +148,30 @@ class Server:
# ***** Klippy Connection ***** # ***** Klippy Connection *****
def _handle_klippy_connection(self, conn, addr): def _handle_klippy_connection(self, conn, addr):
if self.is_klippy_connected: if self.klippy_iostream is not None and \
not self.klippy_iostream.closed():
logging.info("New Connection received while Klippy Connected") logging.info("New Connection received while Klippy Connected")
self.close_client_sock() self.klippy_iostream.close()
logging.info("Klippy Connection Established") logging.info("Klippy Connection Established")
self.is_klippy_connected = True self.klippy_iostream = iostream.IOStream(conn)
conn.setblocking(0) self.klippy_iostream.set_close_callback(
self.klippy_sock = conn self._handle_stream_closed)
self.io_loop.add_handler( self.ioloop.spawn_callback(
self.klippy_sock.fileno(), self._handle_klippy_data, self._read_klippy_stream, self.klippy_iostream)
IOLoop.READ | IOLoop.ERROR)
# begin server iniialization # begin server iniialization
self.init_cb.start() self.init_cb.start()
def _handle_klippy_data(self, fd, events): async def _read_klippy_stream(self, stream):
if events & IOLoop.ERROR: while not stream.closed():
self.close_client_sock()
return
try:
data = self.klippy_sock.recv(4096)
except socket.error as e:
# If bad file descriptor allow connection to be
# closed by the data check
if e.errno == errno.EBADF:
data = b''
else:
return
if data == b'':
# Socket Closed
self.close_client_sock()
return
commands = data.split(b'\x03')
commands[0] = self.partial_data + commands[0]
self.partial_data = commands.pop()
for cmd in commands:
try: try:
decoded_cmd = json.loads(cmd) data = await stream.read_until(b'\x03')
except iostream.StreamClosedError as e:
return
except Exception:
logging.exception("Klippy Stream Read Error")
continue
try:
decoded_cmd = json.loads(data[:-1])
method = decoded_cmd.get('method') method = decoded_cmd.get('method')
params = decoded_cmd.get('params', {}) params = decoded_cmd.get('params', {})
cb = self.remote_methods.get(method) cb = self.remote_methods.get(method)
@ -198,32 +184,22 @@ class Server:
"Error processing Klippy Host Response: %s" "Error processing Klippy Host Response: %s"
% (cmd.decode())) % (cmd.decode()))
def klippy_send(self, data): def _handle_stream_closed(self):
# TODO: need a mutex or lock to make sure that multiple co-routines self.is_klippy_ready = False
# do not attempt to send self.moonraker_available = False
if not self.is_klippy_connected: self.init_cb.stop()
return False for request in self.pending_requests.values():
retries = 10 request.notify(ServerError("Klippy Disconnected", 503))
data = json.dumps(data).encode() + b"\x03" self.pending_requests = {}
while data: logging.info("Klippy Connection Removed")
try: self.send_event("server:klippy_state_changed", "disconnect")
sent = self.klippy_sock.send(data)
except socket.error as e: async def send_klippy_request(self, request):
if e.errno == errno.EBADF or e.errno == errno.EPIPE \ data = json.dumps(request.to_dict()).encode() + b"\x03"
or not retries: try:
sent = 0 await self.klippy_iostream.write(data)
else: except iostream.StreamClosedError:
# XXX - Should pause for 1ms here request.notify(ServerError("Klippy Host not connected", 503))
retries -= 1
continue
retries = 10
if sent > 0:
data = data[sent:]
else:
logging.info("Error sending client data, closing socket")
self.close_client_sock()
return False
return True
async def _initialize(self): async def _initialize(self):
await self._request_endpoints() await self._request_endpoints()
@ -308,11 +284,8 @@ class Server:
def make_request(self, path, method, args): def make_request(self, path, method, args):
base_request = BaseRequest(path, method, args) base_request = BaseRequest(path, method, args)
self.pending_requests[base_request.id] = base_request self.pending_requests[base_request.id] = base_request
ret = self.klippy_send(base_request.to_dict()) self.ioloop.spawn_callback(
if not ret: self.send_klippy_request, base_request)
self.pending_requests.pop(base_request.id, None)
base_request.notify(
ServerError("Klippy Host not connected", 503))
return base_request return base_request
async def _kill_server(self): async def _kill_server(self):
@ -324,29 +297,14 @@ class Server:
for plugin in self.plugins: for plugin in self.plugins:
if hasattr(plugin, "close"): if hasattr(plugin, "close"):
await plugin.close() await plugin.close()
self.close_client_sock() if self.klippy_iostream is not None and \
not self.klippy_iostream.closed():
self.klippy_iostream.close()
self.close_server_sock() self.close_server_sock()
if self.server_running: if self.server_running:
self.server_running = False self.server_running = False
await self.moonraker_app.close() await self.moonraker_app.close()
self.io_loop.stop() self.ioloop.stop()
def close_client_sock(self):
self.is_klippy_ready = False
self.moonraker_available = False
self.init_cb.stop()
for request in self.pending_requests.values():
request.notify(ServerError("Klippy Disconnected", 503))
self.pending_requests = {}
if self.is_klippy_connected:
self.is_klippy_connected = False
logging.info("Klippy Connection Removed")
try:
self.io_loop.remove_handler(self.klippy_sock.fileno())
self.klippy_sock.close()
except socket.error:
logging.exception("Error Closing Client Socket")
self.send_event("server:klippy_state_changed", "disconnect")
def close_server_sock(self): def close_server_sock(self):
try: try: