diff --git a/moonraker/klippy_connection.py b/moonraker/klippy_connection.py index 76f75e5..91cf17c 100644 --- a/moonraker/klippy_connection.py +++ b/moonraker/klippy_connection.py @@ -189,20 +189,17 @@ class KlippyConnection: async def _write_request(self, request: KlippyRequest) -> None: if self.writer is None or self.closing: - self.pending_requests.pop(request.id, None) - request.notify(ServerError("Klippy Host not connected", 503)) + request.set_exception(ServerError("Klippy Host not connected", 503)) return data = json.dumps(request.to_dict()).encode() + b"\x03" try: self.writer.write(data) await self.writer.drain() except asyncio.CancelledError: - self.pending_requests.pop(request.id, None) - request.notify(ServerError("Klippy Write Request Cancelled", 503)) + request.set_exception(ServerError("Klippy Write Request Cancelled", 503)) raise except Exception: - self.pending_requests.pop(request.id, None) - request.notify(ServerError("Klippy Write Request Error", 503)) + request.set_exception(ServerError("Klippy Write Request Error", 503)) if not self.closing: logging.debug("Klippy Disconnection From _write_request()") await self.close() @@ -464,10 +461,10 @@ class KlippyConnection: result = cmd['result'] if not result: result = "ok" + request.set_result(result) else: err = cmd.get('error', "Malformed Klippy Response") - result = ServerError(err, 400) - request.notify(result) + request.set_exception(ServerError(err, 400)) async def _execute_method(self, method_name: str, **kwargs) -> None: try: @@ -580,10 +577,9 @@ class KlippyConnection: self.pending_requests[base_request.id] = base_request self.event_loop.register_callback(self._write_request, base_request) try: - return await asyncio.wait_for(base_request.wait(), timeout) - except asyncio.TimeoutError: + return await base_request.wait(timeout) + finally: self.pending_requests.pop(base_request.id, None) - raise self.server.error("Klippy request timed out", 500) def remove_subscription(self, conn: Subscribable) -> None: self.subscriptions.pop(conn, None) @@ -639,7 +635,7 @@ class KlippyConnection: self._state = "disconnected" self._state_message = "Klippy Disconnected" for request in self.pending_requests.values(): - request.notify(ServerError("Klippy Disconnected", 503)) + request.set_exception(ServerError("Klippy Disconnected", 503)) self.pending_requests = {} self.subscriptions = {} self._peer_cred = {} @@ -680,33 +676,35 @@ class KlippyRequest: self.id = id(self) self.rpc_method = rpc_method self.params = params - self._event = asyncio.Event() - self.response: Any = None + self._fut = asyncio.get_running_loop().create_future() - async def wait(self) -> Any: - # Log pending requests every 60 seconds + async def wait(self, timeout: Optional[float] = None) -> Any: start_time = time.time() + to = timeout or 60. while True: try: - await asyncio.wait_for(self._event.wait(), 60.) + return await asyncio.wait_for(asyncio.shield(self._fut), to) except asyncio.TimeoutError: + if timeout is not None: + self._fut.cancel() + raise ServerError("Klippy request timed out", 500) from None pending_time = time.time() - start_time logging.info( f"Request '{self.rpc_method}' pending: " - f"{pending_time:.2f} seconds") - self._event.clear() - continue - break - if isinstance(self.response, ServerError): - raise self.response - return self.response + f"{pending_time:.2f} seconds" + ) - def notify(self, response: Any) -> None: - if self._event.is_set(): - return - self.response = response - self._event.set() + def set_exception(self, exc: Exception) -> None: + if not self._fut.done(): + self._fut.set_exception(exc) + + def set_result(self, result: Any) -> None: + if not self._fut.done(): + self._fut.set_result(result) def to_dict(self) -> Dict[str, Any]: - return {'id': self.id, 'method': self.rpc_method, - 'params': self.params} + return { + 'id': self.id, + 'method': self.rpc_method, + 'params': self.params + }