diff --git a/klippy/reactor.py b/klippy/reactor.py index 9bce6a9b..76e7d65e 100644 --- a/klippy/reactor.py +++ b/klippy/reactor.py @@ -1,17 +1,27 @@ # File descriptor and timer event helper # -# Copyright (C) 2016,2017 Kevin O'Connor +# Copyright (C) 2016-2018 Kevin O'Connor # # This file may be distributed under the terms of the GNU GPLv3 license. -import select, math, time +import os, select, math, time, Queue import greenlet -import chelper +import chelper, util class ReactorTimer: def __init__(self, callback, waketime): self.callback = callback self.waketime = waketime +class ReactorCallback: + def __init__(self, reactor, callback): + self.reactor = reactor + self.timer = reactor.register_timer(self.invoke, reactor.NOW) + self.callback = callback + def invoke(self, eventtime): + self.reactor.unregister_timer(self.timer) + self.callback(eventtime) + return self.reactor.NEVER + class ReactorFileHandler: def __init__(self, fd, callback): self.fd = fd @@ -28,13 +38,20 @@ class SelectReactor: NOW = 0. NEVER = 9999999999999999. def __init__(self): - self._fds = [] + # Main code + self._process = False + self.monotonic = chelper.get_ffi()[1].get_monotonic + # Timers self._timers = [] self._next_timer = self.NEVER - self._process = False + # Callbacks + self._pipe_fds = None + self._async_queue = Queue.Queue() + # File descriptors + self._fds = [] + # Greenlets self._g_dispatch = None self._greenlets = [] - self.monotonic = chelper.get_ffi()[1].get_monotonic # Timers def _note_time(self, t): nexttime = t.waketime @@ -70,6 +87,36 @@ class SelectReactor: if eventtime >= self._next_timer: return 0. return min(1., max(.001, self._next_timer - self.monotonic())) + # Callbacks + def register_callback(self, callback): + ReactorCallback(self, callback) + def register_async_callback(self, callback): + self._async_queue.put_nowait(callback) + try: + os.write(self._pipe_fds[1], '.') + except os.error: + pass + def _got_pipe_signal(self, eventtime): + try: + os.read(self._pipe_fds[0], 4096) + except os.error: + pass + while 1: + try: + callback = self._async_queue.get_nowait() + except Queue.Empty: + break + ReactorCallback(self, callback) + def _setup_async_callbacks(self): + self._pipe_fds = os.pipe() + util.set_nonblock(self._pipe_fds[0]) + util.set_nonblock(self._pipe_fds[1]) + self.register_fd(self._pipe_fds[0], self._got_pipe_signal) + def __del__(self): + if self._pipe_fds is not None: + os.close(self._pipe_fds[0]) + os.close(self._pipe_fds[1]) + self._pipe_fds = None # Greenlets def _sys_pause(self, waketime): # Pause using system sleep for when reactor not running @@ -91,10 +138,13 @@ class SelectReactor: g.timer = self.register_timer(g.switch, waketime) return g_next.switch() def _end_greenlet(self, g_old): + # Cache this greenlet for later use self._greenlets.append(g_old) self.unregister_timer(g_old.timer) g_old.timer = None + # Switch to existing dispatch self._g_dispatch.switch(self.NEVER) + # This greenlet was reactivated - prepare for main processing loop self._g_dispatch = g_old # File descriptors def register_fd(self, fd, callback): @@ -119,6 +169,8 @@ class SelectReactor: break self._g_dispatch = None def run(self): + if self._pipe_fds is None: + self._setup_async_callbacks() self._process = True g_next = ReactorGreenlet(run=self._dispatch_loop) g_next.switch()