diff --git a/moonraker/moonraker.py b/moonraker/moonraker.py index fd81aed..307c522 100755 --- a/moonraker/moonraker.py +++ b/moonraker/moonraker.py @@ -20,7 +20,6 @@ import signal import confighelper import utils import asyncio -from tornado import iostream from tornado.httpclient import AsyncHTTPClient from eventloop import EventLoop from app import MoonrakerApp @@ -50,6 +49,7 @@ if TYPE_CHECKING: INIT_TIME = .25 LOG_ATTEMPT_INTERVAL = int(2. / INIT_TIME + .5) MAX_LOG_ATTEMPTS = 10 * LOG_ATTEMPT_INTERVAL +UNIX_BUFFER_LIMIT = 2 * 1024 * 1024 CORE_COMPONENTS = [ 'database', 'file_manager', 'klippy_apis', 'machine', @@ -637,7 +637,7 @@ class Server: try: if self.klippy_connection.is_connected(): self.klippy_disconnect_evt = asyncio.Event() - self.klippy_connection.close() + await self.klippy_connection.close() await asyncio.wait_for( self.klippy_disconnect_evt.wait(), 2.) self.klippy_disconnect_evt = None @@ -696,7 +696,9 @@ class KlippyConnection: on_close: Callable[[], None], event_loop: EventLoop ) -> None: - self.iostream: Optional[iostream.IOStream] = None + self.writer: Optional[asyncio.StreamWriter] = None + self.closed: bool = True + self.close_mutex: asyncio.Lock = asyncio.Lock() self.on_recd = on_recd self.on_close = on_close self.event_loop = event_loop @@ -714,24 +716,23 @@ class KlippyConnection: self.log_no_access = False return False self.log_no_access = True - ksock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - kstream = iostream.IOStream(ksock) try: - await kstream.connect(address) - except iostream.StreamClosedError: + reader, writer = await asyncio.open_unix_connection( + address, limit=UNIX_BUFFER_LIMIT) + except Exception: return False logging.info("Klippy Connection Established") - self.iostream = kstream - self.iostream.set_close_callback(self.on_close) - self.event_loop.register_callback(self._read_stream, self.iostream) + self.closed = False + self.writer = writer + self.event_loop.register_callback(self._read_stream, reader) return True - async def _read_stream(self, stream: iostream.IOStream) -> None: - while not stream.closed(): + async def _read_stream(self, reader: asyncio.StreamReader) -> None: + while not reader.at_eof(): try: - data = await stream.read_until(b'\x03') - except iostream.StreamClosedError as e: - return + data = await reader.readuntil(b'\x03') + except asyncio.IncompleteReadError: + break except Exception: logging.exception("Klippy Stream Read Error") continue @@ -741,24 +742,33 @@ class KlippyConnection: except Exception: logging.exception( f"Error processing Klippy Host Response: {data.decode()}") + await self.close() async def send_request(self, request: BaseRequest) -> None: - if self.iostream is None: + if self.writer is None: request.notify(ServerError("Klippy Host not connected", 503)) return data = json.dumps(request.to_dict()).encode() + b"\x03" try: - await self.iostream.write(data) - except iostream.StreamClosedError: + self.writer.write(data) + await self.writer.drain() + except Exception: request.notify(ServerError("Klippy Host not connected", 503)) + await self.close() def is_connected(self) -> bool: - return self.iostream is not None and not self.iostream.closed() + return not self.closed - def close(self) -> None: - if self.iostream is not None and \ - not self.iostream.closed(): - self.iostream.close() + async def close(self) -> None: + async with self.close_mutex: + if self.writer is not None: + self.writer.close() + await self.writer.wait_closed() + self.writer = None + if not self.closed: + self.closed = True + self.on_close() + self.closing = False # Basic WebRequest class, easily converted to dict for json encoding class BaseRequest: