reactor: Fix some corner cases with self._next_timer handling

Make sure to update self._next_timer on greenlet start/stop.

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor 2019-06-19 09:32:40 -04:00
parent 8cce3cc8e7
commit 077c6f7e5e
1 changed files with 21 additions and 16 deletions

View File

@ -82,21 +82,18 @@ class SelectReactor:
self._g_dispatch = None
self._greenlets = []
# Timers
def _note_time(self, timer_handler):
nexttime = timer_handler.waketime
if nexttime < self._next_timer:
self._next_timer = nexttime
def update_timer(self, timer_handler, nexttime):
timer_handler.waketime = nexttime
self._note_time(timer_handler)
def register_timer(self, callback, waketime = NEVER):
def update_timer(self, timer_handler, waketime):
timer_handler.waketime = waketime
self._next_timer = min(self._next_timer, waketime)
def register_timer(self, callback, waketime=NEVER):
timer_handler = ReactorTimer(callback, waketime)
timers = list(self._timers)
timers.append(timer_handler)
self._timers = timers
self._note_time(timer_handler)
self._next_timer = min(self._next_timer, waketime)
return timer_handler
def unregister_timer(self, timer_handler):
timer_handler.waketime = self.NEVER
timers = list(self._timers)
timers.pop(timers.index(timer_handler))
self._timers = timers
@ -106,18 +103,20 @@ class SelectReactor:
self._next_timer = self.NEVER
g_dispatch = self._g_dispatch
for t in self._timers:
if eventtime >= t.waketime:
waketime = t.waketime
if eventtime >= waketime:
t.waketime = self.NEVER
t.waketime = t.callback(eventtime)
t.waketime = waketime = t.callback(eventtime)
if g_dispatch is not self._g_dispatch:
self._next_timer = min(self._next_timer, waketime)
self._end_greenlet(g_dispatch)
return 0.
self._note_time(t)
self._next_timer = min(self._next_timer, waketime)
if eventtime >= self._next_timer:
return 0.
return min(1., max(.001, self._next_timer - self.monotonic()))
# Callbacks
def register_callback(self, callback, waketime = NOW):
def register_callback(self, callback, waketime=NOW):
ReactorCallback(self, callback, waketime)
def register_async_callback(self, callback):
self._async_queue.put_nowait(callback)
@ -158,22 +157,28 @@ class SelectReactor:
if g is not self._g_dispatch:
if self._g_dispatch is None:
return self._sys_pause(waketime)
# Switch to _check_timers (via g.timer.callback return)
return self._g_dispatch.switch(waketime)
# Pausing the dispatch greenlet - prepare a new greenlet to do dispatch
if self._greenlets:
g_next = self._greenlets.pop()
else:
g_next = ReactorGreenlet(run=self._dispatch_loop)
g_next.parent = g.parent
g.timer = self.register_timer(g.switch, waketime)
return g_next.switch()
self._next_timer = self.NOW
# Switch to _dispatch_loop (via _end_greenlet or direct)
eventtime = g_next.switch()
# This greenlet activated from g.timer.callback (via _check_timers)
return eventtime
def _end_greenlet(self, g_old):
# Cache this greenlet for later use
self._greenlets.append(g_old)
self.unregister_timer(g_old.timer)
g_old.timer = None
# Switch to existing dispatch
# Switch to _check_timers (via g_old.timer.callback return)
self._g_dispatch.switch(self.NEVER)
# This greenlet was reactivated - prepare for main processing loop
# This greenlet reactivated from pause() - return to main dispatch loop
self._g_dispatch = g_old
# Mutexes
def mutex(self, is_locked=False):