klippy_connection: refactor KlippyRequest class

Wrap a Future instead of an Event, as we don't need to notify multiple
waiters. Additionally the future can return responses and raise exceptions
directly.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-07-29 14:07:09 -04:00
parent c3697d0656
commit 6467cc6b89
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
1 changed files with 29 additions and 31 deletions

View File

@ -189,20 +189,17 @@ class KlippyConnection:
async def _write_request(self, request: KlippyRequest) -> None:
if self.writer is None or self.closing:
self.pending_requests.pop(request.id, None)
request.notify(ServerError("Klippy Host not connected", 503))
request.set_exception(ServerError("Klippy Host not connected", 503))
return
data = json.dumps(request.to_dict()).encode() + b"\x03"
try:
self.writer.write(data)
await self.writer.drain()
except asyncio.CancelledError:
self.pending_requests.pop(request.id, None)
request.notify(ServerError("Klippy Write Request Cancelled", 503))
request.set_exception(ServerError("Klippy Write Request Cancelled", 503))
raise
except Exception:
self.pending_requests.pop(request.id, None)
request.notify(ServerError("Klippy Write Request Error", 503))
request.set_exception(ServerError("Klippy Write Request Error", 503))
if not self.closing:
logging.debug("Klippy Disconnection From _write_request()")
await self.close()
@ -464,10 +461,10 @@ class KlippyConnection:
result = cmd['result']
if not result:
result = "ok"
request.set_result(result)
else:
err = cmd.get('error', "Malformed Klippy Response")
result = ServerError(err, 400)
request.notify(result)
request.set_exception(ServerError(err, 400))
async def _execute_method(self, method_name: str, **kwargs) -> None:
try:
@ -580,10 +577,9 @@ class KlippyConnection:
self.pending_requests[base_request.id] = base_request
self.event_loop.register_callback(self._write_request, base_request)
try:
return await asyncio.wait_for(base_request.wait(), timeout)
except asyncio.TimeoutError:
return await base_request.wait(timeout)
finally:
self.pending_requests.pop(base_request.id, None)
raise self.server.error("Klippy request timed out", 500)
def remove_subscription(self, conn: Subscribable) -> None:
self.subscriptions.pop(conn, None)
@ -639,7 +635,7 @@ class KlippyConnection:
self._state = "disconnected"
self._state_message = "Klippy Disconnected"
for request in self.pending_requests.values():
request.notify(ServerError("Klippy Disconnected", 503))
request.set_exception(ServerError("Klippy Disconnected", 503))
self.pending_requests = {}
self.subscriptions = {}
self._peer_cred = {}
@ -680,33 +676,35 @@ class KlippyRequest:
self.id = id(self)
self.rpc_method = rpc_method
self.params = params
self._event = asyncio.Event()
self.response: Any = None
self._fut = asyncio.get_running_loop().create_future()
async def wait(self) -> Any:
# Log pending requests every 60 seconds
async def wait(self, timeout: Optional[float] = None) -> Any:
start_time = time.time()
to = timeout or 60.
while True:
try:
await asyncio.wait_for(self._event.wait(), 60.)
return await asyncio.wait_for(asyncio.shield(self._fut), to)
except asyncio.TimeoutError:
if timeout is not None:
self._fut.cancel()
raise ServerError("Klippy request timed out", 500) from None
pending_time = time.time() - start_time
logging.info(
f"Request '{self.rpc_method}' pending: "
f"{pending_time:.2f} seconds")
self._event.clear()
continue
break
if isinstance(self.response, ServerError):
raise self.response
return self.response
f"{pending_time:.2f} seconds"
)
def notify(self, response: Any) -> None:
if self._event.is_set():
return
self.response = response
self._event.set()
def set_exception(self, exc: Exception) -> None:
if not self._fut.done():
self._fut.set_exception(exc)
def set_result(self, result: Any) -> None:
if not self._fut.done():
self._fut.set_result(result)
def to_dict(self) -> Dict[str, Any]:
return {'id': self.id, 'method': self.rpc_method,
'params': self.params}
return {
'id': self.id,
'method': self.rpc_method,
'params': self.params
}