serialqueue: Batch multiple message blocks in a single write()

Some communication protocols are more efficient if fewer write() calls
are invoked.  If multiple message blocks can be sent at the same time
then batch them into a single write() call.

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor 2021-02-12 13:04:26 -05:00
parent 730ef9d347
commit c5968a0830
1 changed files with 38 additions and 29 deletions

View File

@ -1,6 +1,6 @@
// Serial port command queuing // Serial port command queuing
// //
// Copyright (C) 2016-2020 Kevin O'Connor <kevin@koconnor.net> // Copyright (C) 2016-2021 Kevin O'Connor <kevin@koconnor.net>
// //
// This file may be distributed under the terms of the GNU GPLv3 license. // This file may be distributed under the terms of the GNU GPLv3 license.
@ -627,7 +627,7 @@ retransmit_event(struct serialqueue *sq, double eventtime)
pthread_mutex_lock(&sq->lock); pthread_mutex_lock(&sq->lock);
// Retransmit all pending messages // Retransmit all pending messages
uint8_t buf[MESSAGE_MAX * MESSAGE_SEQ_MASK + 1]; uint8_t buf[MESSAGE_MAX * MAX_PENDING_BLOCKS + 1];
int buflen = 0, first_buflen = 0; int buflen = 0, first_buflen = 0;
buf[buflen++] = MESSAGE_SYNC; buf[buflen++] = MESSAGE_SYNC;
struct queue_message *qm; struct queue_message *qm;
@ -665,13 +665,11 @@ retransmit_event(struct serialqueue *sq, double eventtime)
return waketime; return waketime;
} }
// Construct a block of data and send to the serial port // Construct a block of data to be sent to the serial port
static void static int
build_and_send_command(struct serialqueue *sq, double eventtime) build_and_send_command(struct serialqueue *sq, uint8_t *buf, double eventtime)
{ {
struct queue_message *out = message_alloc(); int len = MESSAGE_HEADER_SIZE;
out->len = MESSAGE_HEADER_SIZE;
while (sq->ready_bytes) { while (sq->ready_bytes) {
// Find highest priority message (message with lowest req_clock) // Find highest priority message (message with lowest req_clock)
uint64_t min_clock = MAX_CLOCK; uint64_t min_clock = MAX_CLOCK;
@ -689,13 +687,13 @@ build_and_send_command(struct serialqueue *sq, double eventtime)
} }
} }
// Append message to outgoing command // Append message to outgoing command
if (out->len + qm->len > sizeof(out->msg) - MESSAGE_TRAILER_SIZE) if (len + qm->len > MESSAGE_MAX - MESSAGE_TRAILER_SIZE)
break; break;
list_del(&qm->node); list_del(&qm->node);
if (list_empty(&cq->ready_queue) && list_empty(&cq->stalled_queue)) if (list_empty(&cq->ready_queue) && list_empty(&cq->stalled_queue))
list_del(&cq->node); list_del(&cq->node);
memcpy(&out->msg[out->len], qm->msg, qm->len); memcpy(&buf[len], qm->msg, qm->len);
out->len += qm->len; len += qm->len;
sq->ready_bytes -= qm->len; sq->ready_bytes -= qm->len;
if (qm->notify_id) { if (qm->notify_id) {
// Message requires notification - add to notify list // Message requires notification - add to notify list
@ -707,23 +705,21 @@ build_and_send_command(struct serialqueue *sq, double eventtime)
} }
// Fill header / trailer // Fill header / trailer
out->len += MESSAGE_TRAILER_SIZE; len += MESSAGE_TRAILER_SIZE;
out->msg[MESSAGE_POS_LEN] = out->len; buf[MESSAGE_POS_LEN] = len;
out->msg[MESSAGE_POS_SEQ] = (MESSAGE_DEST buf[MESSAGE_POS_SEQ] = MESSAGE_DEST | (sq->send_seq & MESSAGE_SEQ_MASK);
| (sq->send_seq & MESSAGE_SEQ_MASK)); uint16_t crc = crc16_ccitt(buf, len - MESSAGE_TRAILER_SIZE);
uint16_t crc = crc16_ccitt(out->msg, out->len - MESSAGE_TRAILER_SIZE); buf[len - MESSAGE_TRAILER_CRC] = crc >> 8;
out->msg[out->len - MESSAGE_TRAILER_CRC] = crc >> 8; buf[len - MESSAGE_TRAILER_CRC+1] = crc & 0xff;
out->msg[out->len - MESSAGE_TRAILER_CRC+1] = crc & 0xff; buf[len - MESSAGE_TRAILER_SYNC] = MESSAGE_SYNC;
out->msg[out->len - MESSAGE_TRAILER_SYNC] = MESSAGE_SYNC;
// Send message // Store message block
int ret = write(sq->serial_fd, out->msg, out->len);
if (ret < 0)
report_errno("write", ret);
sq->bytes_write += out->len;
if (eventtime > sq->idle_time) if (eventtime > sq->idle_time)
sq->idle_time = eventtime; sq->idle_time = eventtime;
sq->idle_time += out->len * sq->baud_adjust; sq->idle_time += len * sq->baud_adjust;
struct queue_message *out = message_alloc();
memcpy(out->msg, buf, len);
out->len = len;
out->sent_time = eventtime; out->sent_time = eventtime;
out->receive_time = sq->idle_time; out->receive_time = sq->idle_time;
if (list_empty(&sq->sent_queue)) if (list_empty(&sq->sent_queue))
@ -732,8 +728,9 @@ build_and_send_command(struct serialqueue *sq, double eventtime)
if (!sq->rtt_sample_seq) if (!sq->rtt_sample_seq)
sq->rtt_sample_seq = sq->send_seq; sq->rtt_sample_seq = sq->send_seq;
sq->send_seq++; sq->send_seq++;
sq->need_ack_bytes += out->len; sq->need_ack_bytes += len;
list_add_tail(&out->node, &sq->sent_queue); list_add_tail(&out->node, &sq->sent_queue);
return len;
} }
// Determine the time the next serial data should be sent // Determine the time the next serial data should be sent
@ -815,12 +812,24 @@ static double
command_event(struct serialqueue *sq, double eventtime) command_event(struct serialqueue *sq, double eventtime)
{ {
pthread_mutex_lock(&sq->lock); pthread_mutex_lock(&sq->lock);
uint8_t buf[MESSAGE_MAX * MAX_PENDING_BLOCKS];
int buflen = 0;
double waketime; double waketime;
for (;;) { for (;;) {
waketime = check_send_command(sq, eventtime); waketime = check_send_command(sq, eventtime);
if (waketime != PR_NOW) if (waketime != PR_NOW || buflen + MESSAGE_MAX > sizeof(buf)) {
break; if (buflen) {
build_and_send_command(sq, eventtime); // Write message blocks
int ret = write(sq->serial_fd, buf, buflen);
if (ret < 0)
report_errno("write", ret);
sq->bytes_write += buflen;
buflen = 0;
}
if (waketime != PR_NOW)
break;
}
buflen += build_and_send_command(sq, &buf[buflen], eventtime);
} }
pthread_mutex_unlock(&sq->lock); pthread_mutex_unlock(&sq->lock);
return waketime; return waketime;