serialqueue: Improve checking of out-of-order messages

Consider any message block that acks a message never sent as an
out-of-order block and discard it.

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor 2021-02-01 18:26:08 -05:00
parent 6e79152f47
commit 6a3f4c7ae6
1 changed files with 23 additions and 15 deletions

View File

@ -392,6 +392,7 @@ struct serialqueue {
#define MIN_RTO 0.025 #define MIN_RTO 0.025
#define MAX_RTO 5.000 #define MAX_RTO 5.000
#define MAX_PENDING_BLOCKS 12
#define MIN_REQTIME_DELTA 0.250 #define MIN_REQTIME_DELTA 0.250
#define MIN_BACKGROUND_DELTA 0.005 #define MIN_BACKGROUND_DELTA 0.005
#define IDLE_QUERY_TIME 1.0 #define IDLE_QUERY_TIME 1.0
@ -502,18 +503,21 @@ update_receive_seq(struct serialqueue *sq, double eventtime, uint64_t rseq)
} }
// Process a well formed input message // Process a well formed input message
static void static int
handle_message(struct serialqueue *sq, double eventtime, int len) handle_message(struct serialqueue *sq, double eventtime, int len)
{ {
// Calculate receive sequence number // Calculate receive sequence number
uint64_t rseq = ((sq->receive_seq & ~MESSAGE_SEQ_MASK) uint64_t rseq = ((sq->receive_seq & ~MESSAGE_SEQ_MASK)
| (sq->input_buf[MESSAGE_POS_SEQ] & MESSAGE_SEQ_MASK)); | (sq->input_buf[MESSAGE_POS_SEQ] & MESSAGE_SEQ_MASK));
if (rseq != sq->receive_seq) {
// New sequence number
if (rseq < sq->receive_seq) if (rseq < sq->receive_seq)
rseq += MESSAGE_SEQ_MASK+1; rseq += MESSAGE_SEQ_MASK+1;
if (rseq > sq->send_seq && sq->receive_seq != 1)
if (rseq != sq->receive_seq) // An ack for a message not sent? Out of order message?
// New sequence number return -1;
update_receive_seq(sq, eventtime, rseq); update_receive_seq(sq, eventtime, rseq);
}
// Check for pending messages on notify_queue // Check for pending messages on notify_queue
int must_wake = 0; int must_wake = 0;
@ -553,6 +557,7 @@ handle_message(struct serialqueue *sq, double eventtime, int len)
if (must_wake) if (must_wake)
check_wake_receive(sq); check_wake_receive(sq);
return 0;
} }
// Callback for input activity on the serial fd // Callback for input activity on the serial fd
@ -568,26 +573,29 @@ input_event(struct serialqueue *sq, double eventtime)
} }
sq->input_pos += ret; sq->input_pos += ret;
for (;;) { for (;;) {
ret = check_message(&sq->need_sync, sq->input_buf, sq->input_pos); int len = check_message(&sq->need_sync, sq->input_buf, sq->input_pos);
if (!ret) if (!len)
// Need more data // Need more data
return; return;
if (ret > 0) { if (len > 0) {
// Received a valid message // Received a valid message
pthread_mutex_lock(&sq->lock); pthread_mutex_lock(&sq->lock);
handle_message(sq, eventtime, ret); int ret = handle_message(sq, eventtime, len);
sq->bytes_read += ret; if (ret)
sq->bytes_invalid += len;
else
sq->bytes_read += len;
pthread_mutex_unlock(&sq->lock); pthread_mutex_unlock(&sq->lock);
} else { } else {
// Skip bad data at beginning of input // Skip bad data at beginning of input
ret = -ret; len = -len;
pthread_mutex_lock(&sq->lock); pthread_mutex_lock(&sq->lock);
sq->bytes_invalid += ret; sq->bytes_invalid += len;
pthread_mutex_unlock(&sq->lock); pthread_mutex_unlock(&sq->lock);
} }
sq->input_pos -= ret; sq->input_pos -= len;
if (sq->input_pos) if (sq->input_pos)
memmove(sq->input_buf, &sq->input_buf[ret], sq->input_pos); memmove(sq->input_buf, &sq->input_buf[len], sq->input_pos);
} }
} }
@ -726,7 +734,7 @@ build_and_send_command(struct serialqueue *sq, double eventtime)
static double static double
check_send_command(struct serialqueue *sq, double eventtime) check_send_command(struct serialqueue *sq, double eventtime)
{ {
if (sq->send_seq - sq->receive_seq >= MESSAGE_SEQ_MASK if (sq->send_seq - sq->receive_seq >= MAX_PENDING_BLOCKS
&& sq->receive_seq != (uint64_t)-1) && sq->receive_seq != (uint64_t)-1)
// Need an ack before more messages can be sent // Need an ack before more messages can be sent
return PR_NEVER; return PR_NEVER;