serialqueue: Add "fast reader" support

Add ability to run C code directly from the low-level socket reading
thread.  This enables host based low-latency handlers.

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor 2021-02-10 11:09:47 -05:00
parent 620f77ddb7
commit c53e8c7d4a
2 changed files with 69 additions and 11 deletions

View File

@ -64,6 +64,9 @@ struct serialqueue {
struct list_head notify_queue; struct list_head notify_queue;
// Received messages // Received messages
struct list_head receive_queue; struct list_head receive_queue;
// Fastreader support
pthread_mutex_t fast_reader_dispatch_lock;
struct list_head fast_readers;
// Debugging // Debugging
struct list_head old_sent, old_receive; struct list_head old_sent, old_receive;
// Stats // Stats
@ -195,9 +198,11 @@ update_receive_seq(struct serialqueue *sq, double eventtime, uint64_t rseq)
} }
// Process a well formed input message // Process a well formed input message
static int static void
handle_message(struct serialqueue *sq, double eventtime, int len) handle_message(struct serialqueue *sq, double eventtime, int len)
{ {
pthread_mutex_lock(&sq->lock);
// Calculate receive sequence number // Calculate receive sequence number
uint64_t rseq = ((sq->receive_seq & ~MESSAGE_SEQ_MASK) uint64_t rseq = ((sq->receive_seq & ~MESSAGE_SEQ_MASK)
| (sq->input_buf[MESSAGE_POS_SEQ] & MESSAGE_SEQ_MASK)); | (sq->input_buf[MESSAGE_POS_SEQ] & MESSAGE_SEQ_MASK));
@ -205,11 +210,15 @@ handle_message(struct serialqueue *sq, double eventtime, int len)
// New sequence number // New sequence number
if (rseq < sq->receive_seq) if (rseq < sq->receive_seq)
rseq += MESSAGE_SEQ_MASK+1; rseq += MESSAGE_SEQ_MASK+1;
if (rseq > sq->send_seq && sq->receive_seq != 1) if (rseq > sq->send_seq && sq->receive_seq != 1) {
// An ack for a message not sent? Out of order message? // An ack for a message not sent? Out of order message?
return -1; sq->bytes_invalid += len;
pthread_mutex_unlock(&sq->lock);
return;
}
update_receive_seq(sq, eventtime, rseq); update_receive_seq(sq, eventtime, rseq);
} }
sq->bytes_read += len;
// Check for pending messages on notify_queue // Check for pending messages on notify_queue
int must_wake = 0; int must_wake = 0;
@ -247,9 +256,26 @@ handle_message(struct serialqueue *sq, double eventtime, int len)
must_wake = 1; must_wake = 1;
} }
// Check fast readers
struct fastreader *fr;
list_for_each_entry(fr, &sq->fast_readers, node) {
if (len < fr->prefix_len + MESSAGE_MIN
|| memcmp(&sq->input_buf[MESSAGE_HEADER_SIZE]
, fr->prefix, fr->prefix_len) != 0)
continue;
// Release main lock and invoke callback
pthread_mutex_lock(&sq->fast_reader_dispatch_lock);
if (must_wake)
check_wake_receive(sq);
pthread_mutex_unlock(&sq->lock);
fr->func(fr, sq->input_buf, len);
pthread_mutex_unlock(&sq->fast_reader_dispatch_lock);
return;
}
if (must_wake) if (must_wake)
check_wake_receive(sq); check_wake_receive(sq);
return 0; pthread_mutex_unlock(&sq->lock);
} }
// Callback for input activity on the serial fd // Callback for input activity on the serial fd
@ -288,13 +314,7 @@ input_event(struct serialqueue *sq, double eventtime)
return; return;
if (len > 0) { if (len > 0) {
// Received a valid message // Received a valid message
pthread_mutex_lock(&sq->lock); handle_message(sq, eventtime, len);
int ret = handle_message(sq, eventtime, len);
if (ret)
sq->bytes_invalid += len;
else
sq->bytes_read += len;
pthread_mutex_unlock(&sq->lock);
} else { } else {
// Skip bad data at beginning of input // Skip bad data at beginning of input
len = -len; len = -len;
@ -614,6 +634,7 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id)
list_init(&sq->sent_queue); list_init(&sq->sent_queue);
list_init(&sq->receive_queue); list_init(&sq->receive_queue);
list_init(&sq->notify_queue); list_init(&sq->notify_queue);
list_init(&sq->fast_readers);
// Debugging // Debugging
list_init(&sq->old_sent); list_init(&sq->old_sent);
@ -626,6 +647,9 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id)
if (ret) if (ret)
goto fail; goto fail;
ret = pthread_cond_init(&sq->cond, NULL); ret = pthread_cond_init(&sq->cond, NULL);
if (ret)
goto fail;
ret = pthread_mutex_init(&sq->fast_reader_dispatch_lock, NULL);
if (ret) if (ret)
goto fail; goto fail;
ret = pthread_create(&sq->tid, NULL, background_thread, sq); ret = pthread_create(&sq->tid, NULL, background_thread, sq);
@ -700,6 +724,27 @@ serialqueue_free_commandqueue(struct command_queue *cq)
free(cq); free(cq);
} }
// Add a low-latency message handler
void
serialqueue_add_fastreader(struct serialqueue *sq, struct fastreader *fr)
{
pthread_mutex_lock(&sq->lock);
list_add_tail(&fr->node, &sq->fast_readers);
pthread_mutex_unlock(&sq->lock);
}
// Remove a previously registered low-latency message handler
void
serialqueue_rm_fastreader(struct serialqueue *sq, struct fastreader *fr)
{
pthread_mutex_lock(&sq->lock);
list_del(&fr->node);
pthread_mutex_unlock(&sq->lock);
pthread_mutex_lock(&sq->fast_reader_dispatch_lock); // XXX - goofy locking
pthread_mutex_unlock(&sq->fast_reader_dispatch_lock);
}
// Add a batch of messages to the given command_queue // Add a batch of messages to the given command_queue
void void
serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq

View File

@ -1,12 +1,23 @@
#ifndef SERIALQUEUE_H #ifndef SERIALQUEUE_H
#define SERIALQUEUE_H #define SERIALQUEUE_H
#include <stdint.h> // uint8_t
#include "list.h" // struct list_head #include "list.h" // struct list_head
#include "msgblock.h" // MESSAGE_MAX #include "msgblock.h" // MESSAGE_MAX
#define MAX_CLOCK 0x7fffffffffffffffLL #define MAX_CLOCK 0x7fffffffffffffffLL
#define BACKGROUND_PRIORITY_CLOCK 0x7fffffff00000000LL #define BACKGROUND_PRIORITY_CLOCK 0x7fffffff00000000LL
struct fastreader;
typedef void (*fastreader_cb)(struct fastreader *fr, uint8_t *data, int len);
struct fastreader {
struct list_node node;
fastreader_cb func;
int prefix_len;
uint8_t prefix[MESSAGE_MAX];
};
struct pull_queue_message { struct pull_queue_message {
uint8_t msg[MESSAGE_MAX]; uint8_t msg[MESSAGE_MAX];
int len; int len;
@ -21,6 +32,8 @@ void serialqueue_exit(struct serialqueue *sq);
void serialqueue_free(struct serialqueue *sq); void serialqueue_free(struct serialqueue *sq);
struct command_queue *serialqueue_alloc_commandqueue(void); struct command_queue *serialqueue_alloc_commandqueue(void);
void serialqueue_free_commandqueue(struct command_queue *cq); void serialqueue_free_commandqueue(struct command_queue *cq);
void serialqueue_add_fastreader(struct serialqueue *sq, struct fastreader *fr);
void serialqueue_rm_fastreader(struct serialqueue *sq, struct fastreader *fr);
void serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq void serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq
, struct list_head *msgs); , struct list_head *msgs);
void serialqueue_send(struct serialqueue *sq, struct command_queue *cq void serialqueue_send(struct serialqueue *sq, struct command_queue *cq