From 2559a2dd5ad4c5e8341aeddb6e5a59967867cbd7 Mon Sep 17 00:00:00 2001 From: Kevin O'Connor Date: Thu, 11 Feb 2021 20:43:36 -0500 Subject: [PATCH] pollreactor: Move C pollreactor code from serialqueue.c to its own file Signed-off-by: Kevin O'Connor --- klippy/chelper/__init__.py | 3 +- klippy/chelper/pollreactor.c | 179 ++++++++++++++++++++++++++++ klippy/chelper/pollreactor.h | 20 ++++ klippy/chelper/serialqueue.c | 225 ++++------------------------------- 4 files changed, 226 insertions(+), 201 deletions(-) create mode 100644 klippy/chelper/pollreactor.c create mode 100644 klippy/chelper/pollreactor.h diff --git a/klippy/chelper/__init__.py b/klippy/chelper/__init__.py index 4d199126..07ee190d 100644 --- a/klippy/chelper/__init__.py +++ b/klippy/chelper/__init__.py @@ -18,6 +18,7 @@ COMPILE_ARGS = ("-Wall -g -O2 -shared -fPIC" SSE_FLAGS = "-mfpmath=sse -msse2" SOURCE_FILES = [ 'pyhelper.c', 'serialqueue.c', 'stepcompress.c', 'itersolve.c', 'trapq.c', + 'pollreactor.c', 'kin_cartesian.c', 'kin_corexy.c', 'kin_corexz.c', 'kin_delta.c', 'kin_polar.c', 'kin_rotary_delta.c', 'kin_winch.c', 'kin_extruder.c', 'kin_shaper.c', @@ -25,7 +26,7 @@ SOURCE_FILES = [ DEST_LIB = "c_helper.so" OTHER_FILES = [ 'list.h', 'serialqueue.h', 'stepcompress.h', 'itersolve.h', 'pyhelper.h', - 'trapq.h', + 'trapq.h', 'pollreactor.h', ] defs_stepcompress = """ diff --git a/klippy/chelper/pollreactor.c b/klippy/chelper/pollreactor.c new file mode 100644 index 00000000..eb12e5a8 --- /dev/null +++ b/klippy/chelper/pollreactor.c @@ -0,0 +1,179 @@ +// Code for dispatching timer and file descriptor events +// +// Copyright (C) 2016-2021 Kevin O'Connor +// +// This file may be distributed under the terms of the GNU GPLv3 license. + +#include // fcntl +#include // ceil +#include // poll +#include // malloc +#include // memset +#include "pollreactor.h" // pollreactor_alloc +#include "pyhelper.h" // report_errno + +struct pollreactor_timer { + double waketime; + double (*callback)(void *data, double eventtime); +}; + +struct pollreactor { + int num_fds, num_timers, must_exit; + void *callback_data; + double next_timer; + struct pollfd *fds; + void (**fd_callbacks)(void *data, double eventtime); + struct pollreactor_timer *timers; +}; + +// Allocate a new 'struct pollreactor' object +struct pollreactor * +pollreactor_alloc(int num_fds, int num_timers, void *callback_data) +{ + struct pollreactor *pr = malloc(sizeof(*pr)); + memset(pr, 0, sizeof(*pr)); + pr->num_fds = num_fds; + pr->num_timers = num_timers; + pr->must_exit = 0; + pr->callback_data = callback_data; + pr->next_timer = PR_NEVER; + pr->fds = malloc(num_fds * sizeof(*pr->fds)); + memset(pr->fds, 0, num_fds * sizeof(*pr->fds)); + pr->fd_callbacks = malloc(num_fds * sizeof(*pr->fd_callbacks)); + memset(pr->fd_callbacks, 0, num_fds * sizeof(*pr->fd_callbacks)); + pr->timers = malloc(num_timers * sizeof(*pr->timers)); + memset(pr->timers, 0, num_timers * sizeof(*pr->timers)); + int i; + for (i=0; itimers[i].waketime = PR_NEVER; + return pr; +} + +// Free resources associated with a 'struct pollreactor' object +void +pollreactor_free(struct pollreactor *pr) +{ + free(pr->fds); + pr->fds = NULL; + free(pr->fd_callbacks); + pr->fd_callbacks = NULL; + free(pr->timers); + pr->timers = NULL; + free(pr); +} + +// Add a callback for when a file descriptor (fd) becomes readable +void +pollreactor_add_fd(struct pollreactor *pr, int pos, int fd, void *callback + , int write_only) +{ + pr->fds[pos].fd = fd; + pr->fds[pos].events = POLLHUP | (write_only ? 0 : POLLIN); + pr->fds[pos].revents = 0; + pr->fd_callbacks[pos] = callback; +} + +// Add a timer callback +void +pollreactor_add_timer(struct pollreactor *pr, int pos, void *callback) +{ + pr->timers[pos].callback = callback; + pr->timers[pos].waketime = PR_NEVER; +} + +// Return the last schedule wake-up time for a timer +double +pollreactor_get_timer(struct pollreactor *pr, int pos) +{ + return pr->timers[pos].waketime; +} + +// Set the wake-up time for a given timer +void +pollreactor_update_timer(struct pollreactor *pr, int pos, double waketime) +{ + pr->timers[pos].waketime = waketime; + if (waketime < pr->next_timer) + pr->next_timer = waketime; +} + +// Internal code to invoke timer callbacks +static int +pollreactor_check_timers(struct pollreactor *pr, double eventtime, int busy) +{ + if (eventtime >= pr->next_timer) { + // Find and run pending timers + pr->next_timer = PR_NEVER; + int i; + for (i=0; inum_timers; i++) { + struct pollreactor_timer *timer = &pr->timers[i]; + double t = timer->waketime; + if (eventtime >= t) { + busy = 1; + t = timer->callback(pr->callback_data, eventtime); + timer->waketime = t; + } + if (t < pr->next_timer) + pr->next_timer = t; + } + } + if (busy) + return 0; + // Calculate sleep duration + double timeout = ceil((pr->next_timer - eventtime) * 1000.); + return timeout < 1. ? 1 : (timeout > 1000. ? 1000 : (int)timeout); +} + +// Repeatedly check for timer and fd events and invoke their callbacks +void +pollreactor_run(struct pollreactor *pr) +{ + double eventtime = get_monotonic(); + int busy = 1; + while (! pr->must_exit) { + int timeout = pollreactor_check_timers(pr, eventtime, busy); + busy = 0; + int ret = poll(pr->fds, pr->num_fds, timeout); + eventtime = get_monotonic(); + if (ret > 0) { + busy = 1; + int i; + for (i=0; inum_fds; i++) + if (pr->fds[i].revents) + pr->fd_callbacks[i](pr->callback_data, eventtime); + } else if (ret < 0) { + report_errno("poll", ret); + pr->must_exit = 1; + } + } +} + +// Request that a currently running pollreactor_run() loop exit +void +pollreactor_do_exit(struct pollreactor *pr) +{ + pr->must_exit = 1; +} + +// Check if a pollreactor_run() loop has been requested to exit +int +pollreactor_is_exit(struct pollreactor *pr) +{ + return pr->must_exit; +} + +int +fd_set_non_blocking(int fd) +{ + int flags = fcntl(fd, F_GETFL); + if (flags < 0) { + report_errno("fcntl getfl", flags); + return -1; + } + int ret = fcntl(fd, F_SETFL, flags | O_NONBLOCK); + if (ret < 0) { + report_errno("fcntl setfl", flags); + return -1; + } + return 0; +} diff --git a/klippy/chelper/pollreactor.h b/klippy/chelper/pollreactor.h new file mode 100644 index 00000000..97d7d3a9 --- /dev/null +++ b/klippy/chelper/pollreactor.h @@ -0,0 +1,20 @@ +#ifndef POLLREACTOR_H +#define POLLREACTOR_H + +#define PR_NOW 0. +#define PR_NEVER 9999999999999999. + +struct pollreactor *pollreactor_alloc(int num_fds, int num_timers + , void *callback_data); +void pollreactor_free(struct pollreactor *pr); +void pollreactor_add_fd(struct pollreactor *pr, int pos, int fd, void *callback + , int write_only); +void pollreactor_add_timer(struct pollreactor *pr, int pos, void *callback); +double pollreactor_get_timer(struct pollreactor *pr, int pos); +void pollreactor_update_timer(struct pollreactor *pr, int pos, double waketime); +void pollreactor_run(struct pollreactor *pr); +void pollreactor_do_exit(struct pollreactor *pr); +int pollreactor_is_exit(struct pollreactor *pr); +int fd_set_non_blocking(int fd); + +#endif // pollreactor.h diff --git a/klippy/chelper/serialqueue.c b/klippy/chelper/serialqueue.c index 77d547bb..38f65199 100644 --- a/klippy/chelper/serialqueue.c +++ b/klippy/chelper/serialqueue.c @@ -12,10 +12,8 @@ // clock times, prioritizes commands, and handles retransmissions. A // background thread is launched to do this work and minimize latency. -#include // fcntl #include // // struct can_frame -#include // ceil -#include // poll +#include // fabs #include // pthread_mutex_lock #include // offsetof #include // uint64_t @@ -26,184 +24,11 @@ #include // pipe #include "compiler.h" // __visible #include "list.h" // list_add_tail +#include "pollreactor.h" // pollreactor_alloc #include "pyhelper.h" // get_monotonic #include "serialqueue.h" // struct queue_message -/**************************************************************** - * Poll reactor - ****************************************************************/ - -// The 'poll reactor' code is a mechanism for dispatching timer and -// file descriptor events. - -#define PR_NOW 0. -#define PR_NEVER 9999999999999999. - -struct pollreactor_timer { - double waketime; - double (*callback)(void *data, double eventtime); -}; - -struct pollreactor { - int num_fds, num_timers, must_exit; - void *callback_data; - double next_timer; - struct pollfd *fds; - void (**fd_callbacks)(void *data, double eventtime); - struct pollreactor_timer *timers; -}; - -// Allocate a new 'struct pollreactor' object -static void -pollreactor_setup(struct pollreactor *pr, int num_fds, int num_timers - , void *callback_data) -{ - pr->num_fds = num_fds; - pr->num_timers = num_timers; - pr->must_exit = 0; - pr->callback_data = callback_data; - pr->next_timer = PR_NEVER; - pr->fds = malloc(num_fds * sizeof(*pr->fds)); - memset(pr->fds, 0, num_fds * sizeof(*pr->fds)); - pr->fd_callbacks = malloc(num_fds * sizeof(*pr->fd_callbacks)); - memset(pr->fd_callbacks, 0, num_fds * sizeof(*pr->fd_callbacks)); - pr->timers = malloc(num_timers * sizeof(*pr->timers)); - memset(pr->timers, 0, num_timers * sizeof(*pr->timers)); - int i; - for (i=0; itimers[i].waketime = PR_NEVER; -} - -// Free resources associated with a 'struct pollreactor' object -static void -pollreactor_free(struct pollreactor *pr) -{ - free(pr->fds); - pr->fds = NULL; - free(pr->fd_callbacks); - pr->fd_callbacks = NULL; - free(pr->timers); - pr->timers = NULL; -} - -// Add a callback for when a file descriptor (fd) becomes readable -static void -pollreactor_add_fd(struct pollreactor *pr, int pos, int fd, void *callback - , int write_only) -{ - pr->fds[pos].fd = fd; - pr->fds[pos].events = POLLHUP | (write_only ? 0 : POLLIN); - pr->fds[pos].revents = 0; - pr->fd_callbacks[pos] = callback; -} - -// Add a timer callback -static void -pollreactor_add_timer(struct pollreactor *pr, int pos, void *callback) -{ - pr->timers[pos].callback = callback; - pr->timers[pos].waketime = PR_NEVER; -} - -// Return the last schedule wake-up time for a timer -static double -pollreactor_get_timer(struct pollreactor *pr, int pos) -{ - return pr->timers[pos].waketime; -} - -// Set the wake-up time for a given timer -static void -pollreactor_update_timer(struct pollreactor *pr, int pos, double waketime) -{ - pr->timers[pos].waketime = waketime; - if (waketime < pr->next_timer) - pr->next_timer = waketime; -} - -// Internal code to invoke timer callbacks -static int -pollreactor_check_timers(struct pollreactor *pr, double eventtime, int busy) -{ - if (eventtime >= pr->next_timer) { - // Find and run pending timers - pr->next_timer = PR_NEVER; - int i; - for (i=0; inum_timers; i++) { - struct pollreactor_timer *timer = &pr->timers[i]; - double t = timer->waketime; - if (eventtime >= t) { - busy = 1; - t = timer->callback(pr->callback_data, eventtime); - timer->waketime = t; - } - if (t < pr->next_timer) - pr->next_timer = t; - } - } - if (busy) - return 0; - // Calculate sleep duration - double timeout = ceil((pr->next_timer - eventtime) * 1000.); - return timeout < 1. ? 1 : (timeout > 1000. ? 1000 : (int)timeout); -} - -// Repeatedly check for timer and fd events and invoke their callbacks -static void -pollreactor_run(struct pollreactor *pr) -{ - double eventtime = get_monotonic(); - int busy = 1; - while (! pr->must_exit) { - int timeout = pollreactor_check_timers(pr, eventtime, busy); - busy = 0; - int ret = poll(pr->fds, pr->num_fds, timeout); - eventtime = get_monotonic(); - if (ret > 0) { - busy = 1; - int i; - for (i=0; inum_fds; i++) - if (pr->fds[i].revents) - pr->fd_callbacks[i](pr->callback_data, eventtime); - } else if (ret < 0) { - report_errno("poll", ret); - pr->must_exit = 1; - } - } -} - -// Request that a currently running pollreactor_run() loop exit -static void -pollreactor_do_exit(struct pollreactor *pr) -{ - pr->must_exit = 1; -} - -// Check if a pollreactor_run() loop has been requested to exit -static int -pollreactor_is_exit(struct pollreactor *pr) -{ - return pr->must_exit; -} - -static int -set_non_blocking(int fd) -{ - int flags = fcntl(fd, F_GETFL); - if (flags < 0) { - report_errno("fcntl getfl", flags); - return -1; - } - int ret = fcntl(fd, F_SETFL, flags | O_NONBLOCK); - if (ret < 0) { - report_errno("fcntl setfl", flags); - return -1; - } - return 0; -} - - /**************************************************************** * Serial protocol helpers ****************************************************************/ @@ -354,7 +179,7 @@ message_queue_free(struct list_head *root) struct serialqueue { // Input reading - struct pollreactor pr; + struct pollreactor *pr; int serial_fd, serial_fd_type, client_id; int pipe_fds[2]; uint8_t input_buf[4096]; @@ -479,7 +304,7 @@ update_receive_seq(struct serialqueue *sq, double eventtime, uint64_t rseq) } } sq->receive_seq = rseq; - pollreactor_update_timer(&sq->pr, SQPT_COMMAND, PR_NOW); + pollreactor_update_timer(sq->pr, SQPT_COMMAND, PR_NOW); // Update retransmit info if (sq->rtt_sample_seq && rseq > sq->rtt_sample_seq @@ -504,12 +329,12 @@ update_receive_seq(struct serialqueue *sq, double eventtime, uint64_t rseq) sq->rtt_sample_seq = 0; } if (list_empty(&sq->sent_queue)) { - pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT, PR_NEVER); + pollreactor_update_timer(sq->pr, SQPT_RETRANSMIT, PR_NEVER); } else { struct queue_message *sent = list_first_entry( &sq->sent_queue, struct queue_message, node); double nr = eventtime + sq->rto + sent->len * sq->baud_adjust; - pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT, nr); + pollreactor_update_timer(sq->pr, SQPT_RETRANSMIT, nr); } } @@ -554,7 +379,7 @@ handle_message(struct serialqueue *sq, double eventtime, int len) sq->last_ack_seq = rseq; else if (rseq > sq->ignore_nak_seq && !list_empty(&sq->sent_queue)) // Duplicate Ack is a Nak - do fast retransmit - pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT, PR_NOW); + pollreactor_update_timer(sq->pr, SQPT_RETRANSMIT, PR_NOW); } else { // Data message - add to receive queue struct queue_message *qm = message_fill(sq->input_buf, len); @@ -580,7 +405,7 @@ input_event(struct serialqueue *sq, double eventtime) int ret = read(sq->serial_fd, &cf, sizeof(cf)); if (ret <= 0) { report_errno("can read", ret); - pollreactor_do_exit(&sq->pr); + pollreactor_do_exit(sq->pr); return; } if (cf.can_id != sq->client_id + 1) @@ -595,7 +420,7 @@ input_event(struct serialqueue *sq, double eventtime) report_errno("read", ret); else errorf("Got EOF when reading from device"); - pollreactor_do_exit(&sq->pr); + pollreactor_do_exit(sq->pr); return; } sq->input_pos += ret; @@ -635,7 +460,7 @@ kick_event(struct serialqueue *sq, double eventtime) int ret = read(sq->pipe_fds[0], dummy, sizeof(dummy)); if (ret < 0) report_errno("pipe read", ret); - pollreactor_update_timer(&sq->pr, SQPT_COMMAND, PR_NOW); + pollreactor_update_timer(sq->pr, SQPT_COMMAND, PR_NOW); } static void @@ -691,7 +516,7 @@ retransmit_event(struct serialqueue *sq, double eventtime) sq->bytes_retransmit += buflen; // Update rto - if (pollreactor_get_timer(&sq->pr, SQPT_RETRANSMIT) == PR_NOW) { + if (pollreactor_get_timer(sq->pr, SQPT_RETRANSMIT) == PR_NOW) { // Retransmit due to nak sq->ignore_nak_seq = sq->receive_seq; if (sq->receive_seq < sq->retransmit_seq) @@ -771,7 +596,7 @@ build_and_send_command(struct serialqueue *sq, uint8_t *buf, double eventtime) out->sent_time = eventtime; out->receive_time = sq->idle_time; if (list_empty(&sq->sent_queue)) - pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT + pollreactor_update_timer(sq->pr, SQPT_RETRANSMIT , sq->idle_time + sq->rto); if (!sq->rtt_sample_seq) sq->rtt_sample_seq = sq->send_seq; @@ -886,7 +711,7 @@ static void * background_thread(void *data) { struct serialqueue *sq = data; - pollreactor_run(&sq->pr); + pollreactor_run(sq->pr); pthread_mutex_lock(&sq->lock); check_wake_receive(sq); @@ -910,15 +735,15 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id) goto fail; // Reactor setup - pollreactor_setup(&sq->pr, SQPF_NUM, SQPT_NUM, sq); - pollreactor_add_fd(&sq->pr, SQPF_SERIAL, serial_fd, input_event + sq->pr = pollreactor_alloc(SQPF_NUM, SQPT_NUM, sq); + pollreactor_add_fd(sq->pr, SQPF_SERIAL, serial_fd, input_event , serial_fd_type==SQT_DEBUGFILE); - pollreactor_add_fd(&sq->pr, SQPF_PIPE, sq->pipe_fds[0], kick_event, 0); - pollreactor_add_timer(&sq->pr, SQPT_RETRANSMIT, retransmit_event); - pollreactor_add_timer(&sq->pr, SQPT_COMMAND, command_event); - set_non_blocking(serial_fd); - set_non_blocking(sq->pipe_fds[0]); - set_non_blocking(sq->pipe_fds[1]); + pollreactor_add_fd(sq->pr, SQPF_PIPE, sq->pipe_fds[0], kick_event, 0); + pollreactor_add_timer(sq->pr, SQPT_RETRANSMIT, retransmit_event); + pollreactor_add_timer(sq->pr, SQPT_COMMAND, command_event); + fd_set_non_blocking(serial_fd); + fd_set_non_blocking(sq->pipe_fds[0]); + fd_set_non_blocking(sq->pipe_fds[1]); // Retransmit setup sq->send_seq = 1; @@ -966,7 +791,7 @@ fail: void __visible serialqueue_exit(struct serialqueue *sq) { - pollreactor_do_exit(&sq->pr); + pollreactor_do_exit(sq->pr); kick_bg_thread(sq); int ret = pthread_join(sq->tid, NULL); if (ret) @@ -979,7 +804,7 @@ serialqueue_free(struct serialqueue *sq) { if (!sq) return; - if (!pollreactor_is_exit(&sq->pr)) + if (!pollreactor_is_exit(sq->pr)) serialqueue_exit(sq); pthread_mutex_lock(&sq->lock); message_queue_free(&sq->sent_queue); @@ -995,7 +820,7 @@ serialqueue_free(struct serialqueue *sq) message_queue_free(&cq->stalled_queue); } pthread_mutex_unlock(&sq->lock); - pollreactor_free(&sq->pr); + pollreactor_free(sq->pr); free(sq); } @@ -1085,7 +910,7 @@ serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm) pthread_mutex_lock(&sq->lock); // Wait for message to be available while (list_empty(&sq->receive_queue)) { - if (pollreactor_is_exit(&sq->pr)) + if (pollreactor_is_exit(sq->pr)) goto exit; sq->receive_waiting = 1; int ret = pthread_cond_wait(&sq->cond, &sq->lock);