serialqueue: Limit message transmission to available receive buffer size
If the mcu is using a traditional serial port, then only send a new message block if there is space available in the mcu receive buffer. This should make it significantly less likely that high load on the mcu will result in retransmits. Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
parent
0728c1a8be
commit
d798fae20b
|
@ -59,8 +59,11 @@ defs_serialqueue = """
|
||||||
void serialqueue_encode_and_send(struct serialqueue *sq
|
void serialqueue_encode_and_send(struct serialqueue *sq
|
||||||
, struct command_queue *cq, uint32_t *data, int len
|
, struct command_queue *cq, uint32_t *data, int len
|
||||||
, uint64_t min_clock, uint64_t req_clock);
|
, uint64_t min_clock, uint64_t req_clock);
|
||||||
void serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm);
|
void serialqueue_pull(struct serialqueue *sq
|
||||||
|
, struct pull_queue_message *pqm);
|
||||||
void serialqueue_set_baud_adjust(struct serialqueue *sq, double baud_adjust);
|
void serialqueue_set_baud_adjust(struct serialqueue *sq, double baud_adjust);
|
||||||
|
void serialqueue_set_receive_window(struct serialqueue *sq
|
||||||
|
, int receive_window);
|
||||||
void serialqueue_set_clock_est(struct serialqueue *sq, double est_freq
|
void serialqueue_set_clock_est(struct serialqueue *sq, double est_freq
|
||||||
, double last_clock_time, uint64_t last_clock);
|
, double last_clock_time, uint64_t last_clock);
|
||||||
void serialqueue_get_stats(struct serialqueue *sq, char *buf, int len);
|
void serialqueue_get_stats(struct serialqueue *sq, char *buf, int len);
|
||||||
|
|
|
@ -357,6 +357,7 @@ struct serialqueue {
|
||||||
pthread_cond_t cond;
|
pthread_cond_t cond;
|
||||||
int receive_waiting;
|
int receive_waiting;
|
||||||
// Baud / clock tracking
|
// Baud / clock tracking
|
||||||
|
int receive_window;
|
||||||
double baud_adjust, idle_time;
|
double baud_adjust, idle_time;
|
||||||
double est_freq, last_clock_time;
|
double est_freq, last_clock_time;
|
||||||
uint64_t last_clock;
|
uint64_t last_clock;
|
||||||
|
@ -368,7 +369,7 @@ struct serialqueue {
|
||||||
double srtt, rttvar, rto;
|
double srtt, rttvar, rto;
|
||||||
// Pending transmission message queues
|
// Pending transmission message queues
|
||||||
struct list_head pending_queues;
|
struct list_head pending_queues;
|
||||||
int ready_bytes, stalled_bytes, need_ack_bytes;
|
int ready_bytes, stalled_bytes, need_ack_bytes, last_ack_bytes;
|
||||||
uint64_t need_kick_clock;
|
uint64_t need_kick_clock;
|
||||||
// Received messages
|
// Received messages
|
||||||
struct list_head receive_queue;
|
struct list_head receive_queue;
|
||||||
|
@ -458,6 +459,7 @@ update_receive_seq(struct serialqueue *sq, double eventtime, uint64_t rseq)
|
||||||
if (rseq == sent_seq) {
|
if (rseq == sent_seq) {
|
||||||
// Found sent message corresponding with the received sequence
|
// Found sent message corresponding with the received sequence
|
||||||
sq->last_receive_sent_time = sent->receive_time;
|
sq->last_receive_sent_time = sent->receive_time;
|
||||||
|
sq->last_ack_bytes = sent->len;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -694,11 +696,18 @@ build_and_send_command(struct serialqueue *sq, double eventtime)
|
||||||
static double
|
static double
|
||||||
check_send_command(struct serialqueue *sq, double eventtime)
|
check_send_command(struct serialqueue *sq, double eventtime)
|
||||||
{
|
{
|
||||||
if ((sq->send_seq - sq->receive_seq >= MESSAGE_SEQ_MASK
|
if (sq->send_seq - sq->receive_seq >= MESSAGE_SEQ_MASK
|
||||||
|| (sq->need_ack_bytes - 2*MESSAGE_MAX) * sq->baud_adjust > sq->srtt)
|
|
||||||
&& sq->receive_seq != (uint64_t)-1)
|
&& sq->receive_seq != (uint64_t)-1)
|
||||||
// Need an ack before more messages can be sent
|
// Need an ack before more messages can be sent
|
||||||
return PR_NEVER;
|
return PR_NEVER;
|
||||||
|
if (sq->send_seq > sq->receive_seq && sq->receive_window) {
|
||||||
|
int need_ack_bytes = sq->need_ack_bytes + MESSAGE_MAX;
|
||||||
|
if (sq->last_ack_seq < sq->receive_seq)
|
||||||
|
need_ack_bytes += sq->last_ack_bytes;
|
||||||
|
if (need_ack_bytes > sq->receive_window)
|
||||||
|
// Wait for ack from past messages before sending next message
|
||||||
|
return PR_NEVER;
|
||||||
|
}
|
||||||
|
|
||||||
// Check for stalled messages now ready
|
// Check for stalled messages now ready
|
||||||
double idletime = eventtime > sq->idle_time ? eventtime : sq->idle_time;
|
double idletime = eventtime > sq->idle_time ? eventtime : sq->idle_time;
|
||||||
|
@ -1021,6 +1030,14 @@ serialqueue_set_baud_adjust(struct serialqueue *sq, double baud_adjust)
|
||||||
pthread_mutex_unlock(&sq->lock);
|
pthread_mutex_unlock(&sq->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
serialqueue_set_receive_window(struct serialqueue *sq, int receive_window)
|
||||||
|
{
|
||||||
|
pthread_mutex_lock(&sq->lock);
|
||||||
|
sq->receive_window = receive_window;
|
||||||
|
pthread_mutex_unlock(&sq->lock);
|
||||||
|
}
|
||||||
|
|
||||||
// Set the estimated clock rate of the mcu on the other end of the
|
// Set the estimated clock rate of the mcu on the other end of the
|
||||||
// serial port
|
// serial port
|
||||||
void
|
void
|
||||||
|
|
|
@ -90,6 +90,10 @@ class SerialReader:
|
||||||
baud_adjust = self.BITS_PER_BYTE / mcu_baud
|
baud_adjust = self.BITS_PER_BYTE / mcu_baud
|
||||||
self.ffi_lib.serialqueue_set_baud_adjust(
|
self.ffi_lib.serialqueue_set_baud_adjust(
|
||||||
self.serialqueue, baud_adjust)
|
self.serialqueue, baud_adjust)
|
||||||
|
receive_window = msgparser.get_constant_int('RECEIVE_WINDOW', None)
|
||||||
|
if receive_window is not None:
|
||||||
|
self.ffi_lib.serialqueue_set_receive_window(
|
||||||
|
self.serialqueue, receive_window)
|
||||||
def connect_file(self, debugoutput, dictionary, pace=False):
|
def connect_file(self, debugoutput, dictionary, pace=False):
|
||||||
self.ser = debugoutput
|
self.ser = debugoutput
|
||||||
self.msgparser.process_identify(dictionary, decompress=False)
|
self.msgparser.process_identify(dictionary, decompress=False)
|
||||||
|
|
|
@ -14,10 +14,13 @@
|
||||||
#include "sched.h" // sched_wake_tasks
|
#include "sched.h" // sched_wake_tasks
|
||||||
#include "serial_irq.h" // serial_enable_tx_irq
|
#include "serial_irq.h" // serial_enable_tx_irq
|
||||||
|
|
||||||
static uint8_t receive_buf[192], receive_pos;
|
#define RX_BUFFER_SIZE 192
|
||||||
|
|
||||||
|
static uint8_t receive_buf[RX_BUFFER_SIZE], receive_pos;
|
||||||
static uint8_t transmit_buf[96], transmit_pos, transmit_max;
|
static uint8_t transmit_buf[96], transmit_pos, transmit_max;
|
||||||
|
|
||||||
DECL_CONSTANT(SERIAL_BAUD, CONFIG_SERIAL_BAUD);
|
DECL_CONSTANT(SERIAL_BAUD, CONFIG_SERIAL_BAUD);
|
||||||
|
DECL_CONSTANT(RECEIVE_WINDOW, RX_BUFFER_SIZE);
|
||||||
|
|
||||||
// Rx interrupt - store read data
|
// Rx interrupt - store read data
|
||||||
void
|
void
|
||||||
|
|
Loading…
Reference in New Issue