pollreactor: Move C pollreactor code from serialqueue.c to its own file
Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
parent
05c2d51a12
commit
2559a2dd5a
|
@ -18,6 +18,7 @@ COMPILE_ARGS = ("-Wall -g -O2 -shared -fPIC"
|
||||||
SSE_FLAGS = "-mfpmath=sse -msse2"
|
SSE_FLAGS = "-mfpmath=sse -msse2"
|
||||||
SOURCE_FILES = [
|
SOURCE_FILES = [
|
||||||
'pyhelper.c', 'serialqueue.c', 'stepcompress.c', 'itersolve.c', 'trapq.c',
|
'pyhelper.c', 'serialqueue.c', 'stepcompress.c', 'itersolve.c', 'trapq.c',
|
||||||
|
'pollreactor.c',
|
||||||
'kin_cartesian.c', 'kin_corexy.c', 'kin_corexz.c', 'kin_delta.c',
|
'kin_cartesian.c', 'kin_corexy.c', 'kin_corexz.c', 'kin_delta.c',
|
||||||
'kin_polar.c', 'kin_rotary_delta.c', 'kin_winch.c', 'kin_extruder.c',
|
'kin_polar.c', 'kin_rotary_delta.c', 'kin_winch.c', 'kin_extruder.c',
|
||||||
'kin_shaper.c',
|
'kin_shaper.c',
|
||||||
|
@ -25,7 +26,7 @@ SOURCE_FILES = [
|
||||||
DEST_LIB = "c_helper.so"
|
DEST_LIB = "c_helper.so"
|
||||||
OTHER_FILES = [
|
OTHER_FILES = [
|
||||||
'list.h', 'serialqueue.h', 'stepcompress.h', 'itersolve.h', 'pyhelper.h',
|
'list.h', 'serialqueue.h', 'stepcompress.h', 'itersolve.h', 'pyhelper.h',
|
||||||
'trapq.h',
|
'trapq.h', 'pollreactor.h',
|
||||||
]
|
]
|
||||||
|
|
||||||
defs_stepcompress = """
|
defs_stepcompress = """
|
||||||
|
|
|
@ -0,0 +1,179 @@
|
||||||
|
// Code for dispatching timer and file descriptor events
|
||||||
|
//
|
||||||
|
// Copyright (C) 2016-2021 Kevin O'Connor <kevin@koconnor.net>
|
||||||
|
//
|
||||||
|
// This file may be distributed under the terms of the GNU GPLv3 license.
|
||||||
|
|
||||||
|
#include <fcntl.h> // fcntl
|
||||||
|
#include <math.h> // ceil
|
||||||
|
#include <poll.h> // poll
|
||||||
|
#include <stdlib.h> // malloc
|
||||||
|
#include <string.h> // memset
|
||||||
|
#include "pollreactor.h" // pollreactor_alloc
|
||||||
|
#include "pyhelper.h" // report_errno
|
||||||
|
|
||||||
|
struct pollreactor_timer {
|
||||||
|
double waketime;
|
||||||
|
double (*callback)(void *data, double eventtime);
|
||||||
|
};
|
||||||
|
|
||||||
|
struct pollreactor {
|
||||||
|
int num_fds, num_timers, must_exit;
|
||||||
|
void *callback_data;
|
||||||
|
double next_timer;
|
||||||
|
struct pollfd *fds;
|
||||||
|
void (**fd_callbacks)(void *data, double eventtime);
|
||||||
|
struct pollreactor_timer *timers;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Allocate a new 'struct pollreactor' object
|
||||||
|
struct pollreactor *
|
||||||
|
pollreactor_alloc(int num_fds, int num_timers, void *callback_data)
|
||||||
|
{
|
||||||
|
struct pollreactor *pr = malloc(sizeof(*pr));
|
||||||
|
memset(pr, 0, sizeof(*pr));
|
||||||
|
pr->num_fds = num_fds;
|
||||||
|
pr->num_timers = num_timers;
|
||||||
|
pr->must_exit = 0;
|
||||||
|
pr->callback_data = callback_data;
|
||||||
|
pr->next_timer = PR_NEVER;
|
||||||
|
pr->fds = malloc(num_fds * sizeof(*pr->fds));
|
||||||
|
memset(pr->fds, 0, num_fds * sizeof(*pr->fds));
|
||||||
|
pr->fd_callbacks = malloc(num_fds * sizeof(*pr->fd_callbacks));
|
||||||
|
memset(pr->fd_callbacks, 0, num_fds * sizeof(*pr->fd_callbacks));
|
||||||
|
pr->timers = malloc(num_timers * sizeof(*pr->timers));
|
||||||
|
memset(pr->timers, 0, num_timers * sizeof(*pr->timers));
|
||||||
|
int i;
|
||||||
|
for (i=0; i<num_timers; i++)
|
||||||
|
pr->timers[i].waketime = PR_NEVER;
|
||||||
|
return pr;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Free resources associated with a 'struct pollreactor' object
|
||||||
|
void
|
||||||
|
pollreactor_free(struct pollreactor *pr)
|
||||||
|
{
|
||||||
|
free(pr->fds);
|
||||||
|
pr->fds = NULL;
|
||||||
|
free(pr->fd_callbacks);
|
||||||
|
pr->fd_callbacks = NULL;
|
||||||
|
free(pr->timers);
|
||||||
|
pr->timers = NULL;
|
||||||
|
free(pr);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add a callback for when a file descriptor (fd) becomes readable
|
||||||
|
void
|
||||||
|
pollreactor_add_fd(struct pollreactor *pr, int pos, int fd, void *callback
|
||||||
|
, int write_only)
|
||||||
|
{
|
||||||
|
pr->fds[pos].fd = fd;
|
||||||
|
pr->fds[pos].events = POLLHUP | (write_only ? 0 : POLLIN);
|
||||||
|
pr->fds[pos].revents = 0;
|
||||||
|
pr->fd_callbacks[pos] = callback;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add a timer callback
|
||||||
|
void
|
||||||
|
pollreactor_add_timer(struct pollreactor *pr, int pos, void *callback)
|
||||||
|
{
|
||||||
|
pr->timers[pos].callback = callback;
|
||||||
|
pr->timers[pos].waketime = PR_NEVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return the last schedule wake-up time for a timer
|
||||||
|
double
|
||||||
|
pollreactor_get_timer(struct pollreactor *pr, int pos)
|
||||||
|
{
|
||||||
|
return pr->timers[pos].waketime;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the wake-up time for a given timer
|
||||||
|
void
|
||||||
|
pollreactor_update_timer(struct pollreactor *pr, int pos, double waketime)
|
||||||
|
{
|
||||||
|
pr->timers[pos].waketime = waketime;
|
||||||
|
if (waketime < pr->next_timer)
|
||||||
|
pr->next_timer = waketime;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Internal code to invoke timer callbacks
|
||||||
|
static int
|
||||||
|
pollreactor_check_timers(struct pollreactor *pr, double eventtime, int busy)
|
||||||
|
{
|
||||||
|
if (eventtime >= pr->next_timer) {
|
||||||
|
// Find and run pending timers
|
||||||
|
pr->next_timer = PR_NEVER;
|
||||||
|
int i;
|
||||||
|
for (i=0; i<pr->num_timers; i++) {
|
||||||
|
struct pollreactor_timer *timer = &pr->timers[i];
|
||||||
|
double t = timer->waketime;
|
||||||
|
if (eventtime >= t) {
|
||||||
|
busy = 1;
|
||||||
|
t = timer->callback(pr->callback_data, eventtime);
|
||||||
|
timer->waketime = t;
|
||||||
|
}
|
||||||
|
if (t < pr->next_timer)
|
||||||
|
pr->next_timer = t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (busy)
|
||||||
|
return 0;
|
||||||
|
// Calculate sleep duration
|
||||||
|
double timeout = ceil((pr->next_timer - eventtime) * 1000.);
|
||||||
|
return timeout < 1. ? 1 : (timeout > 1000. ? 1000 : (int)timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Repeatedly check for timer and fd events and invoke their callbacks
|
||||||
|
void
|
||||||
|
pollreactor_run(struct pollreactor *pr)
|
||||||
|
{
|
||||||
|
double eventtime = get_monotonic();
|
||||||
|
int busy = 1;
|
||||||
|
while (! pr->must_exit) {
|
||||||
|
int timeout = pollreactor_check_timers(pr, eventtime, busy);
|
||||||
|
busy = 0;
|
||||||
|
int ret = poll(pr->fds, pr->num_fds, timeout);
|
||||||
|
eventtime = get_monotonic();
|
||||||
|
if (ret > 0) {
|
||||||
|
busy = 1;
|
||||||
|
int i;
|
||||||
|
for (i=0; i<pr->num_fds; i++)
|
||||||
|
if (pr->fds[i].revents)
|
||||||
|
pr->fd_callbacks[i](pr->callback_data, eventtime);
|
||||||
|
} else if (ret < 0) {
|
||||||
|
report_errno("poll", ret);
|
||||||
|
pr->must_exit = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Request that a currently running pollreactor_run() loop exit
|
||||||
|
void
|
||||||
|
pollreactor_do_exit(struct pollreactor *pr)
|
||||||
|
{
|
||||||
|
pr->must_exit = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if a pollreactor_run() loop has been requested to exit
|
||||||
|
int
|
||||||
|
pollreactor_is_exit(struct pollreactor *pr)
|
||||||
|
{
|
||||||
|
return pr->must_exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
int
|
||||||
|
fd_set_non_blocking(int fd)
|
||||||
|
{
|
||||||
|
int flags = fcntl(fd, F_GETFL);
|
||||||
|
if (flags < 0) {
|
||||||
|
report_errno("fcntl getfl", flags);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
int ret = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
|
||||||
|
if (ret < 0) {
|
||||||
|
report_errno("fcntl setfl", flags);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -0,0 +1,20 @@
|
||||||
|
#ifndef POLLREACTOR_H
|
||||||
|
#define POLLREACTOR_H
|
||||||
|
|
||||||
|
#define PR_NOW 0.
|
||||||
|
#define PR_NEVER 9999999999999999.
|
||||||
|
|
||||||
|
struct pollreactor *pollreactor_alloc(int num_fds, int num_timers
|
||||||
|
, void *callback_data);
|
||||||
|
void pollreactor_free(struct pollreactor *pr);
|
||||||
|
void pollreactor_add_fd(struct pollreactor *pr, int pos, int fd, void *callback
|
||||||
|
, int write_only);
|
||||||
|
void pollreactor_add_timer(struct pollreactor *pr, int pos, void *callback);
|
||||||
|
double pollreactor_get_timer(struct pollreactor *pr, int pos);
|
||||||
|
void pollreactor_update_timer(struct pollreactor *pr, int pos, double waketime);
|
||||||
|
void pollreactor_run(struct pollreactor *pr);
|
||||||
|
void pollreactor_do_exit(struct pollreactor *pr);
|
||||||
|
int pollreactor_is_exit(struct pollreactor *pr);
|
||||||
|
int fd_set_non_blocking(int fd);
|
||||||
|
|
||||||
|
#endif // pollreactor.h
|
|
@ -12,10 +12,8 @@
|
||||||
// clock times, prioritizes commands, and handles retransmissions. A
|
// clock times, prioritizes commands, and handles retransmissions. A
|
||||||
// background thread is launched to do this work and minimize latency.
|
// background thread is launched to do this work and minimize latency.
|
||||||
|
|
||||||
#include <fcntl.h> // fcntl
|
|
||||||
#include <linux/can.h> // // struct can_frame
|
#include <linux/can.h> // // struct can_frame
|
||||||
#include <math.h> // ceil
|
#include <math.h> // fabs
|
||||||
#include <poll.h> // poll
|
|
||||||
#include <pthread.h> // pthread_mutex_lock
|
#include <pthread.h> // pthread_mutex_lock
|
||||||
#include <stddef.h> // offsetof
|
#include <stddef.h> // offsetof
|
||||||
#include <stdint.h> // uint64_t
|
#include <stdint.h> // uint64_t
|
||||||
|
@ -26,184 +24,11 @@
|
||||||
#include <unistd.h> // pipe
|
#include <unistd.h> // pipe
|
||||||
#include "compiler.h" // __visible
|
#include "compiler.h" // __visible
|
||||||
#include "list.h" // list_add_tail
|
#include "list.h" // list_add_tail
|
||||||
|
#include "pollreactor.h" // pollreactor_alloc
|
||||||
#include "pyhelper.h" // get_monotonic
|
#include "pyhelper.h" // get_monotonic
|
||||||
#include "serialqueue.h" // struct queue_message
|
#include "serialqueue.h" // struct queue_message
|
||||||
|
|
||||||
|
|
||||||
/****************************************************************
|
|
||||||
* Poll reactor
|
|
||||||
****************************************************************/
|
|
||||||
|
|
||||||
// The 'poll reactor' code is a mechanism for dispatching timer and
|
|
||||||
// file descriptor events.
|
|
||||||
|
|
||||||
#define PR_NOW 0.
|
|
||||||
#define PR_NEVER 9999999999999999.
|
|
||||||
|
|
||||||
struct pollreactor_timer {
|
|
||||||
double waketime;
|
|
||||||
double (*callback)(void *data, double eventtime);
|
|
||||||
};
|
|
||||||
|
|
||||||
struct pollreactor {
|
|
||||||
int num_fds, num_timers, must_exit;
|
|
||||||
void *callback_data;
|
|
||||||
double next_timer;
|
|
||||||
struct pollfd *fds;
|
|
||||||
void (**fd_callbacks)(void *data, double eventtime);
|
|
||||||
struct pollreactor_timer *timers;
|
|
||||||
};
|
|
||||||
|
|
||||||
// Allocate a new 'struct pollreactor' object
|
|
||||||
static void
|
|
||||||
pollreactor_setup(struct pollreactor *pr, int num_fds, int num_timers
|
|
||||||
, void *callback_data)
|
|
||||||
{
|
|
||||||
pr->num_fds = num_fds;
|
|
||||||
pr->num_timers = num_timers;
|
|
||||||
pr->must_exit = 0;
|
|
||||||
pr->callback_data = callback_data;
|
|
||||||
pr->next_timer = PR_NEVER;
|
|
||||||
pr->fds = malloc(num_fds * sizeof(*pr->fds));
|
|
||||||
memset(pr->fds, 0, num_fds * sizeof(*pr->fds));
|
|
||||||
pr->fd_callbacks = malloc(num_fds * sizeof(*pr->fd_callbacks));
|
|
||||||
memset(pr->fd_callbacks, 0, num_fds * sizeof(*pr->fd_callbacks));
|
|
||||||
pr->timers = malloc(num_timers * sizeof(*pr->timers));
|
|
||||||
memset(pr->timers, 0, num_timers * sizeof(*pr->timers));
|
|
||||||
int i;
|
|
||||||
for (i=0; i<num_timers; i++)
|
|
||||||
pr->timers[i].waketime = PR_NEVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Free resources associated with a 'struct pollreactor' object
|
|
||||||
static void
|
|
||||||
pollreactor_free(struct pollreactor *pr)
|
|
||||||
{
|
|
||||||
free(pr->fds);
|
|
||||||
pr->fds = NULL;
|
|
||||||
free(pr->fd_callbacks);
|
|
||||||
pr->fd_callbacks = NULL;
|
|
||||||
free(pr->timers);
|
|
||||||
pr->timers = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add a callback for when a file descriptor (fd) becomes readable
|
|
||||||
static void
|
|
||||||
pollreactor_add_fd(struct pollreactor *pr, int pos, int fd, void *callback
|
|
||||||
, int write_only)
|
|
||||||
{
|
|
||||||
pr->fds[pos].fd = fd;
|
|
||||||
pr->fds[pos].events = POLLHUP | (write_only ? 0 : POLLIN);
|
|
||||||
pr->fds[pos].revents = 0;
|
|
||||||
pr->fd_callbacks[pos] = callback;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add a timer callback
|
|
||||||
static void
|
|
||||||
pollreactor_add_timer(struct pollreactor *pr, int pos, void *callback)
|
|
||||||
{
|
|
||||||
pr->timers[pos].callback = callback;
|
|
||||||
pr->timers[pos].waketime = PR_NEVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return the last schedule wake-up time for a timer
|
|
||||||
static double
|
|
||||||
pollreactor_get_timer(struct pollreactor *pr, int pos)
|
|
||||||
{
|
|
||||||
return pr->timers[pos].waketime;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set the wake-up time for a given timer
|
|
||||||
static void
|
|
||||||
pollreactor_update_timer(struct pollreactor *pr, int pos, double waketime)
|
|
||||||
{
|
|
||||||
pr->timers[pos].waketime = waketime;
|
|
||||||
if (waketime < pr->next_timer)
|
|
||||||
pr->next_timer = waketime;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Internal code to invoke timer callbacks
|
|
||||||
static int
|
|
||||||
pollreactor_check_timers(struct pollreactor *pr, double eventtime, int busy)
|
|
||||||
{
|
|
||||||
if (eventtime >= pr->next_timer) {
|
|
||||||
// Find and run pending timers
|
|
||||||
pr->next_timer = PR_NEVER;
|
|
||||||
int i;
|
|
||||||
for (i=0; i<pr->num_timers; i++) {
|
|
||||||
struct pollreactor_timer *timer = &pr->timers[i];
|
|
||||||
double t = timer->waketime;
|
|
||||||
if (eventtime >= t) {
|
|
||||||
busy = 1;
|
|
||||||
t = timer->callback(pr->callback_data, eventtime);
|
|
||||||
timer->waketime = t;
|
|
||||||
}
|
|
||||||
if (t < pr->next_timer)
|
|
||||||
pr->next_timer = t;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (busy)
|
|
||||||
return 0;
|
|
||||||
// Calculate sleep duration
|
|
||||||
double timeout = ceil((pr->next_timer - eventtime) * 1000.);
|
|
||||||
return timeout < 1. ? 1 : (timeout > 1000. ? 1000 : (int)timeout);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Repeatedly check for timer and fd events and invoke their callbacks
|
|
||||||
static void
|
|
||||||
pollreactor_run(struct pollreactor *pr)
|
|
||||||
{
|
|
||||||
double eventtime = get_monotonic();
|
|
||||||
int busy = 1;
|
|
||||||
while (! pr->must_exit) {
|
|
||||||
int timeout = pollreactor_check_timers(pr, eventtime, busy);
|
|
||||||
busy = 0;
|
|
||||||
int ret = poll(pr->fds, pr->num_fds, timeout);
|
|
||||||
eventtime = get_monotonic();
|
|
||||||
if (ret > 0) {
|
|
||||||
busy = 1;
|
|
||||||
int i;
|
|
||||||
for (i=0; i<pr->num_fds; i++)
|
|
||||||
if (pr->fds[i].revents)
|
|
||||||
pr->fd_callbacks[i](pr->callback_data, eventtime);
|
|
||||||
} else if (ret < 0) {
|
|
||||||
report_errno("poll", ret);
|
|
||||||
pr->must_exit = 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Request that a currently running pollreactor_run() loop exit
|
|
||||||
static void
|
|
||||||
pollreactor_do_exit(struct pollreactor *pr)
|
|
||||||
{
|
|
||||||
pr->must_exit = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if a pollreactor_run() loop has been requested to exit
|
|
||||||
static int
|
|
||||||
pollreactor_is_exit(struct pollreactor *pr)
|
|
||||||
{
|
|
||||||
return pr->must_exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int
|
|
||||||
set_non_blocking(int fd)
|
|
||||||
{
|
|
||||||
int flags = fcntl(fd, F_GETFL);
|
|
||||||
if (flags < 0) {
|
|
||||||
report_errno("fcntl getfl", flags);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
int ret = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
|
|
||||||
if (ret < 0) {
|
|
||||||
report_errno("fcntl setfl", flags);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/****************************************************************
|
/****************************************************************
|
||||||
* Serial protocol helpers
|
* Serial protocol helpers
|
||||||
****************************************************************/
|
****************************************************************/
|
||||||
|
@ -354,7 +179,7 @@ message_queue_free(struct list_head *root)
|
||||||
|
|
||||||
struct serialqueue {
|
struct serialqueue {
|
||||||
// Input reading
|
// Input reading
|
||||||
struct pollreactor pr;
|
struct pollreactor *pr;
|
||||||
int serial_fd, serial_fd_type, client_id;
|
int serial_fd, serial_fd_type, client_id;
|
||||||
int pipe_fds[2];
|
int pipe_fds[2];
|
||||||
uint8_t input_buf[4096];
|
uint8_t input_buf[4096];
|
||||||
|
@ -479,7 +304,7 @@ update_receive_seq(struct serialqueue *sq, double eventtime, uint64_t rseq)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sq->receive_seq = rseq;
|
sq->receive_seq = rseq;
|
||||||
pollreactor_update_timer(&sq->pr, SQPT_COMMAND, PR_NOW);
|
pollreactor_update_timer(sq->pr, SQPT_COMMAND, PR_NOW);
|
||||||
|
|
||||||
// Update retransmit info
|
// Update retransmit info
|
||||||
if (sq->rtt_sample_seq && rseq > sq->rtt_sample_seq
|
if (sq->rtt_sample_seq && rseq > sq->rtt_sample_seq
|
||||||
|
@ -504,12 +329,12 @@ update_receive_seq(struct serialqueue *sq, double eventtime, uint64_t rseq)
|
||||||
sq->rtt_sample_seq = 0;
|
sq->rtt_sample_seq = 0;
|
||||||
}
|
}
|
||||||
if (list_empty(&sq->sent_queue)) {
|
if (list_empty(&sq->sent_queue)) {
|
||||||
pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT, PR_NEVER);
|
pollreactor_update_timer(sq->pr, SQPT_RETRANSMIT, PR_NEVER);
|
||||||
} else {
|
} else {
|
||||||
struct queue_message *sent = list_first_entry(
|
struct queue_message *sent = list_first_entry(
|
||||||
&sq->sent_queue, struct queue_message, node);
|
&sq->sent_queue, struct queue_message, node);
|
||||||
double nr = eventtime + sq->rto + sent->len * sq->baud_adjust;
|
double nr = eventtime + sq->rto + sent->len * sq->baud_adjust;
|
||||||
pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT, nr);
|
pollreactor_update_timer(sq->pr, SQPT_RETRANSMIT, nr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -554,7 +379,7 @@ handle_message(struct serialqueue *sq, double eventtime, int len)
|
||||||
sq->last_ack_seq = rseq;
|
sq->last_ack_seq = rseq;
|
||||||
else if (rseq > sq->ignore_nak_seq && !list_empty(&sq->sent_queue))
|
else if (rseq > sq->ignore_nak_seq && !list_empty(&sq->sent_queue))
|
||||||
// Duplicate Ack is a Nak - do fast retransmit
|
// Duplicate Ack is a Nak - do fast retransmit
|
||||||
pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT, PR_NOW);
|
pollreactor_update_timer(sq->pr, SQPT_RETRANSMIT, PR_NOW);
|
||||||
} else {
|
} else {
|
||||||
// Data message - add to receive queue
|
// Data message - add to receive queue
|
||||||
struct queue_message *qm = message_fill(sq->input_buf, len);
|
struct queue_message *qm = message_fill(sq->input_buf, len);
|
||||||
|
@ -580,7 +405,7 @@ input_event(struct serialqueue *sq, double eventtime)
|
||||||
int ret = read(sq->serial_fd, &cf, sizeof(cf));
|
int ret = read(sq->serial_fd, &cf, sizeof(cf));
|
||||||
if (ret <= 0) {
|
if (ret <= 0) {
|
||||||
report_errno("can read", ret);
|
report_errno("can read", ret);
|
||||||
pollreactor_do_exit(&sq->pr);
|
pollreactor_do_exit(sq->pr);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (cf.can_id != sq->client_id + 1)
|
if (cf.can_id != sq->client_id + 1)
|
||||||
|
@ -595,7 +420,7 @@ input_event(struct serialqueue *sq, double eventtime)
|
||||||
report_errno("read", ret);
|
report_errno("read", ret);
|
||||||
else
|
else
|
||||||
errorf("Got EOF when reading from device");
|
errorf("Got EOF when reading from device");
|
||||||
pollreactor_do_exit(&sq->pr);
|
pollreactor_do_exit(sq->pr);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
sq->input_pos += ret;
|
sq->input_pos += ret;
|
||||||
|
@ -635,7 +460,7 @@ kick_event(struct serialqueue *sq, double eventtime)
|
||||||
int ret = read(sq->pipe_fds[0], dummy, sizeof(dummy));
|
int ret = read(sq->pipe_fds[0], dummy, sizeof(dummy));
|
||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
report_errno("pipe read", ret);
|
report_errno("pipe read", ret);
|
||||||
pollreactor_update_timer(&sq->pr, SQPT_COMMAND, PR_NOW);
|
pollreactor_update_timer(sq->pr, SQPT_COMMAND, PR_NOW);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
@ -691,7 +516,7 @@ retransmit_event(struct serialqueue *sq, double eventtime)
|
||||||
sq->bytes_retransmit += buflen;
|
sq->bytes_retransmit += buflen;
|
||||||
|
|
||||||
// Update rto
|
// Update rto
|
||||||
if (pollreactor_get_timer(&sq->pr, SQPT_RETRANSMIT) == PR_NOW) {
|
if (pollreactor_get_timer(sq->pr, SQPT_RETRANSMIT) == PR_NOW) {
|
||||||
// Retransmit due to nak
|
// Retransmit due to nak
|
||||||
sq->ignore_nak_seq = sq->receive_seq;
|
sq->ignore_nak_seq = sq->receive_seq;
|
||||||
if (sq->receive_seq < sq->retransmit_seq)
|
if (sq->receive_seq < sq->retransmit_seq)
|
||||||
|
@ -771,7 +596,7 @@ build_and_send_command(struct serialqueue *sq, uint8_t *buf, double eventtime)
|
||||||
out->sent_time = eventtime;
|
out->sent_time = eventtime;
|
||||||
out->receive_time = sq->idle_time;
|
out->receive_time = sq->idle_time;
|
||||||
if (list_empty(&sq->sent_queue))
|
if (list_empty(&sq->sent_queue))
|
||||||
pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT
|
pollreactor_update_timer(sq->pr, SQPT_RETRANSMIT
|
||||||
, sq->idle_time + sq->rto);
|
, sq->idle_time + sq->rto);
|
||||||
if (!sq->rtt_sample_seq)
|
if (!sq->rtt_sample_seq)
|
||||||
sq->rtt_sample_seq = sq->send_seq;
|
sq->rtt_sample_seq = sq->send_seq;
|
||||||
|
@ -886,7 +711,7 @@ static void *
|
||||||
background_thread(void *data)
|
background_thread(void *data)
|
||||||
{
|
{
|
||||||
struct serialqueue *sq = data;
|
struct serialqueue *sq = data;
|
||||||
pollreactor_run(&sq->pr);
|
pollreactor_run(sq->pr);
|
||||||
|
|
||||||
pthread_mutex_lock(&sq->lock);
|
pthread_mutex_lock(&sq->lock);
|
||||||
check_wake_receive(sq);
|
check_wake_receive(sq);
|
||||||
|
@ -910,15 +735,15 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id)
|
||||||
goto fail;
|
goto fail;
|
||||||
|
|
||||||
// Reactor setup
|
// Reactor setup
|
||||||
pollreactor_setup(&sq->pr, SQPF_NUM, SQPT_NUM, sq);
|
sq->pr = pollreactor_alloc(SQPF_NUM, SQPT_NUM, sq);
|
||||||
pollreactor_add_fd(&sq->pr, SQPF_SERIAL, serial_fd, input_event
|
pollreactor_add_fd(sq->pr, SQPF_SERIAL, serial_fd, input_event
|
||||||
, serial_fd_type==SQT_DEBUGFILE);
|
, serial_fd_type==SQT_DEBUGFILE);
|
||||||
pollreactor_add_fd(&sq->pr, SQPF_PIPE, sq->pipe_fds[0], kick_event, 0);
|
pollreactor_add_fd(sq->pr, SQPF_PIPE, sq->pipe_fds[0], kick_event, 0);
|
||||||
pollreactor_add_timer(&sq->pr, SQPT_RETRANSMIT, retransmit_event);
|
pollreactor_add_timer(sq->pr, SQPT_RETRANSMIT, retransmit_event);
|
||||||
pollreactor_add_timer(&sq->pr, SQPT_COMMAND, command_event);
|
pollreactor_add_timer(sq->pr, SQPT_COMMAND, command_event);
|
||||||
set_non_blocking(serial_fd);
|
fd_set_non_blocking(serial_fd);
|
||||||
set_non_blocking(sq->pipe_fds[0]);
|
fd_set_non_blocking(sq->pipe_fds[0]);
|
||||||
set_non_blocking(sq->pipe_fds[1]);
|
fd_set_non_blocking(sq->pipe_fds[1]);
|
||||||
|
|
||||||
// Retransmit setup
|
// Retransmit setup
|
||||||
sq->send_seq = 1;
|
sq->send_seq = 1;
|
||||||
|
@ -966,7 +791,7 @@ fail:
|
||||||
void __visible
|
void __visible
|
||||||
serialqueue_exit(struct serialqueue *sq)
|
serialqueue_exit(struct serialqueue *sq)
|
||||||
{
|
{
|
||||||
pollreactor_do_exit(&sq->pr);
|
pollreactor_do_exit(sq->pr);
|
||||||
kick_bg_thread(sq);
|
kick_bg_thread(sq);
|
||||||
int ret = pthread_join(sq->tid, NULL);
|
int ret = pthread_join(sq->tid, NULL);
|
||||||
if (ret)
|
if (ret)
|
||||||
|
@ -979,7 +804,7 @@ serialqueue_free(struct serialqueue *sq)
|
||||||
{
|
{
|
||||||
if (!sq)
|
if (!sq)
|
||||||
return;
|
return;
|
||||||
if (!pollreactor_is_exit(&sq->pr))
|
if (!pollreactor_is_exit(sq->pr))
|
||||||
serialqueue_exit(sq);
|
serialqueue_exit(sq);
|
||||||
pthread_mutex_lock(&sq->lock);
|
pthread_mutex_lock(&sq->lock);
|
||||||
message_queue_free(&sq->sent_queue);
|
message_queue_free(&sq->sent_queue);
|
||||||
|
@ -995,7 +820,7 @@ serialqueue_free(struct serialqueue *sq)
|
||||||
message_queue_free(&cq->stalled_queue);
|
message_queue_free(&cq->stalled_queue);
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&sq->lock);
|
pthread_mutex_unlock(&sq->lock);
|
||||||
pollreactor_free(&sq->pr);
|
pollreactor_free(sq->pr);
|
||||||
free(sq);
|
free(sq);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1085,7 +910,7 @@ serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm)
|
||||||
pthread_mutex_lock(&sq->lock);
|
pthread_mutex_lock(&sq->lock);
|
||||||
// Wait for message to be available
|
// Wait for message to be available
|
||||||
while (list_empty(&sq->receive_queue)) {
|
while (list_empty(&sq->receive_queue)) {
|
||||||
if (pollreactor_is_exit(&sq->pr))
|
if (pollreactor_is_exit(sq->pr))
|
||||||
goto exit;
|
goto exit;
|
||||||
sq->receive_waiting = 1;
|
sq->receive_waiting = 1;
|
||||||
int ret = pthread_cond_wait(&sq->cond, &sq->lock);
|
int ret = pthread_cond_wait(&sq->cond, &sq->lock);
|
||||||
|
|
Loading…
Reference in New Issue