eventloop: improve callback handling

Wrap all callbacks in a coroutine that handles exceptions.  This
should eliminate "task not retreived" errors.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2022-12-24 07:44:06 -05:00
parent 3d3911d4fd
commit d6b1a724a8
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
1 changed files with 14 additions and 19 deletions

View File

@ -15,7 +15,6 @@ from typing import (
TYPE_CHECKING, TYPE_CHECKING,
Awaitable, Awaitable,
Callable, Callable,
Coroutine,
Optional, Optional,
Tuple, Tuple,
TypeVar, TypeVar,
@ -67,11 +66,16 @@ class EventLoop:
*args, *args,
**kwargs **kwargs
) -> None: ) -> None:
if inspect.iscoroutinefunction(callback): async def _wrapper():
self.aioloop.create_task(callback(*args, **kwargs)) # type: ignore try:
else: ret = callback(*args, **kwargs)
self.aioloop.call_soon( if inspect.isawaitable(ret):
functools.partial(callback, *args, **kwargs)) await ret
except asyncio.CancelledError:
raise
except Exception:
logging.exception("Error Running Callback")
self.aioloop.create_task(_wrapper())
def delay_callback(self, def delay_callback(self,
delay: float, delay: float,
@ -79,23 +83,14 @@ class EventLoop:
*args, *args,
**kwargs **kwargs
) -> asyncio.TimerHandle: ) -> asyncio.TimerHandle:
if inspect.iscoroutinefunction(callback): return self.aioloop.call_later(
return self.aioloop.call_later( delay, self.register_callback,
delay, self._async_callback, functools.partial(callback, *args, **kwargs)
functools.partial(callback, *args, **kwargs)) )
else:
return self.aioloop.call_later(
delay, functools.partial(callback, *args, **kwargs))
def register_timer(self, callback: TimerCallback): def register_timer(self, callback: TimerCallback):
return FlexTimer(self, callback) return FlexTimer(self, callback)
def _async_callback(self, callback: Callable[[], Coroutine]) -> None:
# This wrapper delays creation of the coroutine object. In the
# event that a callback is cancelled this prevents "coroutine
# was never awaited" warnings in asyncio
self.aioloop.create_task(callback())
def run_in_thread(self, def run_in_thread(self,
callback: Callable[..., _T], callback: Callable[..., _T],
*args *args