reactor: Add support for "completions"

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor 2019-06-26 14:05:10 -04:00 committed by KevinOConnor
parent e148dbe52a
commit 233da97b19
1 changed files with 46 additions and 9 deletions

View File

@ -7,19 +7,45 @@ import os, select, math, time, Queue
import greenlet import greenlet
import chelper, util import chelper, util
_NOW = 0.
_NEVER = 9999999999999999.
class ReactorTimer: class ReactorTimer:
def __init__(self, callback, waketime): def __init__(self, callback, waketime):
self.callback = callback self.callback = callback
self.waketime = waketime self.waketime = waketime
class ReactorCompletion:
class sentinel: pass
def __init__(self, reactor):
self.reactor = reactor
self.result = self.sentinel
self.waiting = None
def test(self):
return self.result is not self.sentinel
def complete(self, result):
self.result = result
if self.waiting is not None:
self.reactor.update_timer(self.waiting.timer, self.reactor.NOW)
def wait(self, waketime=_NEVER, waketime_result=None):
if self.result is self.sentinel:
self.waiting = greenlet.getcurrent()
self.reactor.pause(waketime)
self.waiting = None
if self.result is self.sentinel:
return waketime_result
return self.result
class ReactorCallback: class ReactorCallback:
def __init__(self, reactor, callback, waketime): def __init__(self, reactor, callback, waketime):
self.reactor = reactor self.reactor = reactor
self.timer = reactor.register_timer(self.invoke, waketime) self.timer = reactor.register_timer(self.invoke, waketime)
self.callback = callback self.callback = callback
self.completion = ReactorCompletion(reactor)
def invoke(self, eventtime): def invoke(self, eventtime):
self.reactor.unregister_timer(self.timer) self.reactor.unregister_timer(self.timer)
self.callback(eventtime) res = self.callback(eventtime)
self.completion.complete(res)
return self.reactor.NEVER return self.reactor.NEVER
class ReactorFileHandler: class ReactorFileHandler:
@ -64,8 +90,8 @@ class ReactorMutex:
self.reactor.update_timer(self.queue[0].timer, self.reactor.NOW) self.reactor.update_timer(self.queue[0].timer, self.reactor.NOW)
class SelectReactor: class SelectReactor:
NOW = 0. NOW = _NOW
NEVER = 9999999999999999. NEVER = _NEVER
def __init__(self): def __init__(self):
# Main code # Main code
self._process = False self._process = False
@ -115,11 +141,22 @@ class SelectReactor:
if eventtime >= self._next_timer: if eventtime >= self._next_timer:
return 0. return 0.
return min(1., max(.001, self._next_timer - self.monotonic())) return min(1., max(.001, self._next_timer - self.monotonic()))
# Callbacks # Callbacks and Completions
def completion(self):
return ReactorCompletion(self)
def register_callback(self, callback, waketime=NOW): def register_callback(self, callback, waketime=NOW):
ReactorCallback(self, callback, waketime) rcb = ReactorCallback(self, callback, waketime)
def register_async_callback(self, callback): return rcb.completion
self._async_queue.put_nowait(callback) # Asynchronous (from another thread) callbacks and completions
def register_async_callback(self, callback, waketime=NOW):
self._async_queue.put_nowait(
(ReactorCallback, (self, callback, waketime)))
try:
os.write(self._pipe_fds[1], '.')
except os.error:
pass
def async_complete(self, completion, result):
self._async_queue.put_nowait((completion.complete, (result,)))
try: try:
os.write(self._pipe_fds[1], '.') os.write(self._pipe_fds[1], '.')
except os.error: except os.error:
@ -131,10 +168,10 @@ class SelectReactor:
pass pass
while 1: while 1:
try: try:
callback = self._async_queue.get_nowait() func, args = self._async_queue.get_nowait()
except Queue.Empty: except Queue.Empty:
break break
ReactorCallback(self, callback, self.NOW) func(*args)
def _setup_async_callbacks(self): def _setup_async_callbacks(self):
self._pipe_fds = os.pipe() self._pipe_fds = os.pipe()
util.set_nonblock(self._pipe_fds[0]) util.set_nonblock(self._pipe_fds[0])