serialqueue: Support notification of when a command is processed
Add ability for the host code to get a notification when the ack for a command sent to the micro-controller is received. This is in preparation for improved detection of message loss between mcu and host. Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
parent
7b90830ae5
commit
c6c360c4e1
|
@ -110,6 +110,7 @@ defs_serialqueue = """
|
||||||
uint8_t msg[MESSAGE_MAX];
|
uint8_t msg[MESSAGE_MAX];
|
||||||
int len;
|
int len;
|
||||||
double sent_time, receive_time;
|
double sent_time, receive_time;
|
||||||
|
uint64_t notify_id;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct serialqueue *serialqueue_alloc(int serial_fd, int write_only);
|
struct serialqueue *serialqueue_alloc(int serial_fd, int write_only);
|
||||||
|
@ -118,7 +119,8 @@ defs_serialqueue = """
|
||||||
struct command_queue *serialqueue_alloc_commandqueue(void);
|
struct command_queue *serialqueue_alloc_commandqueue(void);
|
||||||
void serialqueue_free_commandqueue(struct command_queue *cq);
|
void serialqueue_free_commandqueue(struct command_queue *cq);
|
||||||
void serialqueue_send(struct serialqueue *sq, struct command_queue *cq
|
void serialqueue_send(struct serialqueue *sq, struct command_queue *cq
|
||||||
, uint8_t *msg, int len, uint64_t min_clock, uint64_t req_clock);
|
, uint8_t *msg, int len, uint64_t min_clock, uint64_t req_clock
|
||||||
|
, uint64_t notify_id);
|
||||||
void serialqueue_pull(struct serialqueue *sq
|
void serialqueue_pull(struct serialqueue *sq
|
||||||
, struct pull_queue_message *pqm);
|
, struct pull_queue_message *pqm);
|
||||||
void serialqueue_set_baud_adjust(struct serialqueue *sq
|
void serialqueue_set_baud_adjust(struct serialqueue *sq
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
// Serial port command queuing
|
// Serial port command queuing
|
||||||
//
|
//
|
||||||
// Copyright (C) 2016-2018 Kevin O'Connor <kevin@koconnor.net>
|
// Copyright (C) 2016-2020 Kevin O'Connor <kevin@koconnor.net>
|
||||||
//
|
//
|
||||||
// This file may be distributed under the terms of the GNU GPLv3 license.
|
// This file may be distributed under the terms of the GNU GPLv3 license.
|
||||||
//
|
|
||||||
// This goal of this code is to handle low-level serial port
|
// This goal of this code is to handle low-level serial port
|
||||||
// communications with a microcontroller (mcu). This code is written
|
// communications with a microcontroller (mcu). This code is written
|
||||||
// in C (instead of python) to reduce communication latencies and to
|
// in C (instead of python) to reduce communication latencies and to
|
||||||
|
@ -372,6 +372,7 @@ struct serialqueue {
|
||||||
struct list_head pending_queues;
|
struct list_head pending_queues;
|
||||||
int ready_bytes, stalled_bytes, need_ack_bytes, last_ack_bytes;
|
int ready_bytes, stalled_bytes, need_ack_bytes, last_ack_bytes;
|
||||||
uint64_t need_kick_clock;
|
uint64_t need_kick_clock;
|
||||||
|
struct list_head notify_queue;
|
||||||
// Received messages
|
// Received messages
|
||||||
struct list_head receive_queue;
|
struct list_head receive_queue;
|
||||||
// Debugging
|
// Debugging
|
||||||
|
@ -512,6 +513,25 @@ handle_message(struct serialqueue *sq, double eventtime, int len)
|
||||||
if (rseq != sq->receive_seq)
|
if (rseq != sq->receive_seq)
|
||||||
// New sequence number
|
// New sequence number
|
||||||
update_receive_seq(sq, eventtime, rseq);
|
update_receive_seq(sq, eventtime, rseq);
|
||||||
|
|
||||||
|
// Check for pending messages on notify_queue
|
||||||
|
int must_wake = 0;
|
||||||
|
while (!list_empty(&sq->notify_queue)) {
|
||||||
|
struct queue_message *qm = list_first_entry(
|
||||||
|
&sq->notify_queue, struct queue_message, node);
|
||||||
|
uint64_t wake_seq = rseq - 1 - (len > MESSAGE_MIN ? 1 : 0);
|
||||||
|
uint64_t notify_msg_sent_seq = qm->req_clock;
|
||||||
|
if (notify_msg_sent_seq > wake_seq)
|
||||||
|
break;
|
||||||
|
list_del(&qm->node);
|
||||||
|
qm->len = 0;
|
||||||
|
qm->sent_time = sq->last_receive_sent_time;
|
||||||
|
qm->receive_time = eventtime;
|
||||||
|
list_add_tail(&qm->node, &sq->receive_queue);
|
||||||
|
must_wake = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process message
|
||||||
if (len == MESSAGE_MIN) {
|
if (len == MESSAGE_MIN) {
|
||||||
// Ack/nak message
|
// Ack/nak message
|
||||||
if (sq->last_ack_seq < rseq)
|
if (sq->last_ack_seq < rseq)
|
||||||
|
@ -519,18 +539,19 @@ handle_message(struct serialqueue *sq, double eventtime, int len)
|
||||||
else if (rseq > sq->ignore_nak_seq && !list_empty(&sq->sent_queue))
|
else if (rseq > sq->ignore_nak_seq && !list_empty(&sq->sent_queue))
|
||||||
// Duplicate Ack is a Nak - do fast retransmit
|
// Duplicate Ack is a Nak - do fast retransmit
|
||||||
pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT, PR_NOW);
|
pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT, PR_NOW);
|
||||||
}
|
} else {
|
||||||
|
// Data message - add to receive queue
|
||||||
if (len > MESSAGE_MIN) {
|
|
||||||
// Add message to receive queue
|
|
||||||
struct queue_message *qm = message_fill(sq->input_buf, len);
|
struct queue_message *qm = message_fill(sq->input_buf, len);
|
||||||
qm->sent_time = (rseq > sq->retransmit_seq
|
qm->sent_time = (rseq > sq->retransmit_seq
|
||||||
? sq->last_receive_sent_time : 0.);
|
? sq->last_receive_sent_time : 0.);
|
||||||
qm->receive_time = get_monotonic(); // must be time post read()
|
qm->receive_time = get_monotonic(); // must be time post read()
|
||||||
qm->receive_time -= sq->baud_adjust * len;
|
qm->receive_time -= sq->baud_adjust * len;
|
||||||
list_add_tail(&qm->node, &sq->receive_queue);
|
list_add_tail(&qm->node, &sq->receive_queue);
|
||||||
check_wake_receive(sq);
|
must_wake = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (must_wake)
|
||||||
|
check_wake_receive(sq);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Callback for input activity on the serial fd
|
// Callback for input activity on the serial fd
|
||||||
|
@ -661,7 +682,13 @@ build_and_send_command(struct serialqueue *sq, double eventtime)
|
||||||
memcpy(&out->msg[out->len], qm->msg, qm->len);
|
memcpy(&out->msg[out->len], qm->msg, qm->len);
|
||||||
out->len += qm->len;
|
out->len += qm->len;
|
||||||
sq->ready_bytes -= qm->len;
|
sq->ready_bytes -= qm->len;
|
||||||
message_free(qm);
|
if (qm->notify_id) {
|
||||||
|
// Message requires notification - add to notify list
|
||||||
|
qm->req_clock = sq->send_seq;
|
||||||
|
list_add_tail(&qm->node, &sq->notify_queue);
|
||||||
|
} else {
|
||||||
|
message_free(qm);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fill header / trailer
|
// Fill header / trailer
|
||||||
|
@ -835,6 +862,7 @@ serialqueue_alloc(int serial_fd, int write_only)
|
||||||
list_init(&sq->pending_queues);
|
list_init(&sq->pending_queues);
|
||||||
list_init(&sq->sent_queue);
|
list_init(&sq->sent_queue);
|
||||||
list_init(&sq->receive_queue);
|
list_init(&sq->receive_queue);
|
||||||
|
list_init(&sq->notify_queue);
|
||||||
|
|
||||||
// Debugging
|
// Debugging
|
||||||
list_init(&sq->old_sent);
|
list_init(&sq->old_sent);
|
||||||
|
@ -882,6 +910,7 @@ serialqueue_free(struct serialqueue *sq)
|
||||||
pthread_mutex_lock(&sq->lock);
|
pthread_mutex_lock(&sq->lock);
|
||||||
message_queue_free(&sq->sent_queue);
|
message_queue_free(&sq->sent_queue);
|
||||||
message_queue_free(&sq->receive_queue);
|
message_queue_free(&sq->receive_queue);
|
||||||
|
message_queue_free(&sq->notify_queue);
|
||||||
message_queue_free(&sq->old_sent);
|
message_queue_free(&sq->old_sent);
|
||||||
message_queue_free(&sq->old_receive);
|
message_queue_free(&sq->old_receive);
|
||||||
while (!list_empty(&sq->pending_queues)) {
|
while (!list_empty(&sq->pending_queues)) {
|
||||||
|
@ -960,11 +989,13 @@ serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq
|
||||||
// given time and priority.
|
// given time and priority.
|
||||||
void __visible
|
void __visible
|
||||||
serialqueue_send(struct serialqueue *sq, struct command_queue *cq, uint8_t *msg
|
serialqueue_send(struct serialqueue *sq, struct command_queue *cq, uint8_t *msg
|
||||||
, int len, uint64_t min_clock, uint64_t req_clock)
|
, int len, uint64_t min_clock, uint64_t req_clock
|
||||||
|
, uint64_t notify_id)
|
||||||
{
|
{
|
||||||
struct queue_message *qm = message_fill(msg, len);
|
struct queue_message *qm = message_fill(msg, len);
|
||||||
qm->min_clock = min_clock;
|
qm->min_clock = min_clock;
|
||||||
qm->req_clock = req_clock;
|
qm->req_clock = req_clock;
|
||||||
|
qm->notify_id = notify_id;
|
||||||
|
|
||||||
struct list_head msgs;
|
struct list_head msgs;
|
||||||
list_init(&msgs);
|
list_init(&msgs);
|
||||||
|
@ -998,7 +1029,11 @@ serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm)
|
||||||
pqm->len = qm->len;
|
pqm->len = qm->len;
|
||||||
pqm->sent_time = qm->sent_time;
|
pqm->sent_time = qm->sent_time;
|
||||||
pqm->receive_time = qm->receive_time;
|
pqm->receive_time = qm->receive_time;
|
||||||
debug_queue_add(&sq->old_receive, qm);
|
pqm->notify_id = qm->notify_id;
|
||||||
|
if (qm->len)
|
||||||
|
debug_queue_add(&sq->old_receive, qm);
|
||||||
|
else
|
||||||
|
message_free(qm);
|
||||||
|
|
||||||
pthread_mutex_unlock(&sq->lock);
|
pthread_mutex_unlock(&sq->lock);
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -32,6 +32,7 @@ struct queue_message {
|
||||||
double sent_time, receive_time;
|
double sent_time, receive_time;
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
uint64_t notify_id;
|
||||||
struct list_node node;
|
struct list_node node;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -42,6 +43,7 @@ struct pull_queue_message {
|
||||||
uint8_t msg[MESSAGE_MAX];
|
uint8_t msg[MESSAGE_MAX];
|
||||||
int len;
|
int len;
|
||||||
double sent_time, receive_time;
|
double sent_time, receive_time;
|
||||||
|
uint64_t notify_id;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct serialqueue;
|
struct serialqueue;
|
||||||
|
@ -53,8 +55,8 @@ void serialqueue_free_commandqueue(struct command_queue *cq);
|
||||||
void serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq
|
void serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq
|
||||||
, struct list_head *msgs);
|
, struct list_head *msgs);
|
||||||
void serialqueue_send(struct serialqueue *sq, struct command_queue *cq
|
void serialqueue_send(struct serialqueue *sq, struct command_queue *cq
|
||||||
, uint8_t *msg, int len
|
, uint8_t *msg, int len, uint64_t min_clock
|
||||||
, uint64_t min_clock, uint64_t req_clock);
|
, uint64_t req_clock, uint64_t notify_id);
|
||||||
void serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm);
|
void serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm);
|
||||||
void serialqueue_set_baud_adjust(struct serialqueue *sq, double baud_adjust);
|
void serialqueue_set_baud_adjust(struct serialqueue *sq, double baud_adjust);
|
||||||
void serialqueue_set_clock_est(struct serialqueue *sq, double est_freq
|
void serialqueue_set_clock_est(struct serialqueue *sq, double est_freq
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
# Serial port management for firmware communication
|
# Serial port management for firmware communication
|
||||||
#
|
#
|
||||||
# Copyright (C) 2016-2019 Kevin O'Connor <kevin@koconnor.net>
|
# Copyright (C) 2016-2020 Kevin O'Connor <kevin@koconnor.net>
|
||||||
#
|
#
|
||||||
# This file may be distributed under the terms of the GNU GPLv3 license.
|
# This file may be distributed under the terms of the GNU GPLv3 license.
|
||||||
import logging, threading
|
import logging, threading
|
||||||
|
@ -32,13 +32,22 @@ class SerialReader:
|
||||||
self.handlers = {}
|
self.handlers = {}
|
||||||
self.register_response(self._handle_unknown_init, '#unknown')
|
self.register_response(self._handle_unknown_init, '#unknown')
|
||||||
self.register_response(self.handle_output, '#output')
|
self.register_response(self.handle_output, '#output')
|
||||||
|
# Sent message notification tracking
|
||||||
|
self.last_notify_id = 0
|
||||||
|
self.pending_notifications = {}
|
||||||
def _bg_thread(self):
|
def _bg_thread(self):
|
||||||
response = self.ffi_main.new('struct pull_queue_message *')
|
response = self.ffi_main.new('struct pull_queue_message *')
|
||||||
while 1:
|
while 1:
|
||||||
self.ffi_lib.serialqueue_pull(self.serialqueue, response)
|
self.ffi_lib.serialqueue_pull(self.serialqueue, response)
|
||||||
count = response.len
|
count = response.len
|
||||||
if count <= 0:
|
if count < 0:
|
||||||
break
|
break
|
||||||
|
if response.notify_id:
|
||||||
|
params = {'#sent_time': response.sent_time,
|
||||||
|
'#receive_time': response.receive_time}
|
||||||
|
completion = self.pending_notifications.pop(response.notify_id)
|
||||||
|
self.reactor.async_complete(completion, params)
|
||||||
|
continue
|
||||||
params = self.msgparser.parse(response.msg[0:count])
|
params = self.msgparser.parse(response.msg[0:count])
|
||||||
params['#sent_time'] = response.sent_time
|
params['#sent_time'] = response.sent_time
|
||||||
params['#receive_time'] = response.receive_time
|
params['#receive_time'] = response.receive_time
|
||||||
|
@ -126,6 +135,9 @@ class SerialReader:
|
||||||
if self.ser is not None:
|
if self.ser is not None:
|
||||||
self.ser.close()
|
self.ser.close()
|
||||||
self.ser = None
|
self.ser = None
|
||||||
|
for pn in self.pending_notifications.values():
|
||||||
|
pn.complete(None)
|
||||||
|
self.pending_notifications.clear()
|
||||||
def stats(self, eventtime):
|
def stats(self, eventtime):
|
||||||
if self.serialqueue is None:
|
if self.serialqueue is None:
|
||||||
return ""
|
return ""
|
||||||
|
@ -145,8 +157,19 @@ class SerialReader:
|
||||||
self.handlers[name, oid] = callback
|
self.handlers[name, oid] = callback
|
||||||
# Command sending
|
# Command sending
|
||||||
def raw_send(self, cmd, minclock, reqclock, cmd_queue):
|
def raw_send(self, cmd, minclock, reqclock, cmd_queue):
|
||||||
self.ffi_lib.serialqueue_send(
|
self.ffi_lib.serialqueue_send(self.serialqueue, cmd_queue,
|
||||||
self.serialqueue, cmd_queue, cmd, len(cmd), minclock, reqclock)
|
cmd, len(cmd), minclock, reqclock, 0)
|
||||||
|
def raw_send_wait_ack(self, cmd, minclock, reqclock, cmd_queue):
|
||||||
|
self.last_notify_id += 1
|
||||||
|
nid = self.last_notify_id
|
||||||
|
completion = self.reactor.completion()
|
||||||
|
self.pending_notifications[nid] = completion
|
||||||
|
self.ffi_lib.serialqueue_send(self.serialqueue, cmd_queue,
|
||||||
|
cmd, len(cmd), minclock, reqclock, nid)
|
||||||
|
params = completion.wait()
|
||||||
|
if params is None:
|
||||||
|
raise error("Serial connection closed")
|
||||||
|
return params
|
||||||
def send(self, msg, minclock=0, reqclock=0):
|
def send(self, msg, minclock=0, reqclock=0):
|
||||||
cmd = self.msgparser.create_command(msg)
|
cmd = self.msgparser.create_command(msg)
|
||||||
self.raw_send(cmd, minclock, reqclock, self.default_cmd_queue)
|
self.raw_send(cmd, minclock, reqclock, self.default_cmd_queue)
|
||||||
|
|
Loading…
Reference in New Issue