From 233da97b19e81ebf5ca0bf62d8a0203b44ed70ea Mon Sep 17 00:00:00 2001 From: Kevin O'Connor Date: Wed, 26 Jun 2019 14:05:10 -0400 Subject: [PATCH] reactor: Add support for "completions" Signed-off-by: Kevin O'Connor --- klippy/reactor.py | 55 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 46 insertions(+), 9 deletions(-) diff --git a/klippy/reactor.py b/klippy/reactor.py index 29ffe4f6..164f21b7 100644 --- a/klippy/reactor.py +++ b/klippy/reactor.py @@ -7,19 +7,45 @@ import os, select, math, time, Queue import greenlet import chelper, util +_NOW = 0. +_NEVER = 9999999999999999. + class ReactorTimer: def __init__(self, callback, waketime): self.callback = callback self.waketime = waketime +class ReactorCompletion: + class sentinel: pass + def __init__(self, reactor): + self.reactor = reactor + self.result = self.sentinel + self.waiting = None + def test(self): + return self.result is not self.sentinel + def complete(self, result): + self.result = result + if self.waiting is not None: + self.reactor.update_timer(self.waiting.timer, self.reactor.NOW) + def wait(self, waketime=_NEVER, waketime_result=None): + if self.result is self.sentinel: + self.waiting = greenlet.getcurrent() + self.reactor.pause(waketime) + self.waiting = None + if self.result is self.sentinel: + return waketime_result + return self.result + class ReactorCallback: def __init__(self, reactor, callback, waketime): self.reactor = reactor self.timer = reactor.register_timer(self.invoke, waketime) self.callback = callback + self.completion = ReactorCompletion(reactor) def invoke(self, eventtime): self.reactor.unregister_timer(self.timer) - self.callback(eventtime) + res = self.callback(eventtime) + self.completion.complete(res) return self.reactor.NEVER class ReactorFileHandler: @@ -64,8 +90,8 @@ class ReactorMutex: self.reactor.update_timer(self.queue[0].timer, self.reactor.NOW) class SelectReactor: - NOW = 0. - NEVER = 9999999999999999. + NOW = _NOW + NEVER = _NEVER def __init__(self): # Main code self._process = False @@ -115,11 +141,22 @@ class SelectReactor: if eventtime >= self._next_timer: return 0. return min(1., max(.001, self._next_timer - self.monotonic())) - # Callbacks + # Callbacks and Completions + def completion(self): + return ReactorCompletion(self) def register_callback(self, callback, waketime=NOW): - ReactorCallback(self, callback, waketime) - def register_async_callback(self, callback): - self._async_queue.put_nowait(callback) + rcb = ReactorCallback(self, callback, waketime) + return rcb.completion + # Asynchronous (from another thread) callbacks and completions + def register_async_callback(self, callback, waketime=NOW): + self._async_queue.put_nowait( + (ReactorCallback, (self, callback, waketime))) + try: + os.write(self._pipe_fds[1], '.') + except os.error: + pass + def async_complete(self, completion, result): + self._async_queue.put_nowait((completion.complete, (result,))) try: os.write(self._pipe_fds[1], '.') except os.error: @@ -131,10 +168,10 @@ class SelectReactor: pass while 1: try: - callback = self._async_queue.get_nowait() + func, args = self._async_queue.get_nowait() except Queue.Empty: break - ReactorCallback(self, callback, self.NOW) + func(*args) def _setup_async_callbacks(self): self._pipe_fds = os.pipe() util.set_nonblock(self._pipe_fds[0])