eventloop: add a timer class
Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
parent
2216e85fef
commit
69a06dd12a
|
@ -15,12 +15,14 @@ from typing import (
|
|||
Callable,
|
||||
Coroutine,
|
||||
Optional,
|
||||
TypeVar
|
||||
TypeVar,
|
||||
Union
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
_T = TypeVar("_T")
|
||||
FlexCallback = Callable[..., Optional[Awaitable]]
|
||||
TimerCallback = Callable[[float], Union[float, Awaitable[float]]]
|
||||
|
||||
class EventLoop:
|
||||
TimeoutError = asyncio.TimeoutError
|
||||
|
@ -34,6 +36,8 @@ class EventLoop:
|
|||
self.remove_writer = self.aioloop.remove_writer
|
||||
self.get_loop_time = self.aioloop.time
|
||||
self.create_future = self.aioloop.create_future
|
||||
self.create_task = self.aioloop.create_task
|
||||
self.call_at = self.aioloop.call_at
|
||||
|
||||
def register_callback(self,
|
||||
callback: FlexCallback,
|
||||
|
@ -60,6 +64,9 @@ class EventLoop:
|
|||
return self.aioloop.call_later(
|
||||
delay, functools.partial(callback, *args, **kwargs))
|
||||
|
||||
def register_timer(self, callback: TimerCallback):
|
||||
return FlexTimer(self, callback)
|
||||
|
||||
def _async_callback(self, callback: Callable[[], Coroutine]) -> None:
|
||||
# This wrapper delays creation of the coroutine object. In the
|
||||
# event that a callback is cancelled this prevents "coroutine
|
||||
|
@ -81,3 +88,45 @@ class EventLoop:
|
|||
|
||||
def close(self):
|
||||
self.aioloop.close()
|
||||
|
||||
class FlexTimer:
|
||||
def __init__(self,
|
||||
eventloop: EventLoop,
|
||||
callback: TimerCallback
|
||||
) -> None:
|
||||
self.eventloop = eventloop
|
||||
self.callback = callback
|
||||
self.timer_handle: Optional[asyncio.TimerHandle] = None
|
||||
self.running: bool = False
|
||||
|
||||
def start(self, delay: float = 0.):
|
||||
if self.running:
|
||||
return
|
||||
self.running = True
|
||||
call_time = self.eventloop.get_loop_time() + delay
|
||||
self.timer_handle = self.eventloop.call_at(
|
||||
call_time, self._schedule_task)
|
||||
|
||||
def stop(self):
|
||||
if not self.running:
|
||||
return
|
||||
self.running = False
|
||||
if self.timer_handle is not None:
|
||||
self.timer_handle.cancel()
|
||||
self.timer_handle = None
|
||||
|
||||
def _schedule_task(self):
|
||||
self.timer_handle = None
|
||||
self.eventloop.create_task(self._call_wrapper())
|
||||
|
||||
def is_running(self) -> bool:
|
||||
return self.running
|
||||
|
||||
async def _call_wrapper(self):
|
||||
if not self.running:
|
||||
return
|
||||
ret = self.callback(self.eventloop.get_loop_time())
|
||||
if isinstance(ret, Awaitable):
|
||||
ret = await ret
|
||||
if self.running:
|
||||
self.timer_handle = self.eventloop.call_at(ret, self._schedule_task)
|
||||
|
|
Loading…
Reference in New Issue