From c6c360c4e14374a56dcb0477e1e7759683841093 Mon Sep 17 00:00:00 2001 From: Kevin O'Connor Date: Fri, 14 Feb 2020 20:47:08 -0500 Subject: [PATCH] serialqueue: Support notification of when a command is processed Add ability for the host code to get a notification when the ack for a command sent to the micro-controller is received. This is in preparation for improved detection of message loss between mcu and host. Signed-off-by: Kevin O'Connor --- klippy/chelper/__init__.py | 4 ++- klippy/chelper/serialqueue.c | 55 +++++++++++++++++++++++++++++------- klippy/chelper/serialqueue.h | 6 ++-- klippy/serialhdl.py | 31 +++++++++++++++++--- 4 files changed, 79 insertions(+), 17 deletions(-) diff --git a/klippy/chelper/__init__.py b/klippy/chelper/__init__.py index c979340b..1a4b2d74 100644 --- a/klippy/chelper/__init__.py +++ b/klippy/chelper/__init__.py @@ -110,6 +110,7 @@ defs_serialqueue = """ uint8_t msg[MESSAGE_MAX]; int len; double sent_time, receive_time; + uint64_t notify_id; }; struct serialqueue *serialqueue_alloc(int serial_fd, int write_only); @@ -118,7 +119,8 @@ defs_serialqueue = """ struct command_queue *serialqueue_alloc_commandqueue(void); void serialqueue_free_commandqueue(struct command_queue *cq); void serialqueue_send(struct serialqueue *sq, struct command_queue *cq - , uint8_t *msg, int len, uint64_t min_clock, uint64_t req_clock); + , uint8_t *msg, int len, uint64_t min_clock, uint64_t req_clock + , uint64_t notify_id); void serialqueue_pull(struct serialqueue *sq , struct pull_queue_message *pqm); void serialqueue_set_baud_adjust(struct serialqueue *sq diff --git a/klippy/chelper/serialqueue.c b/klippy/chelper/serialqueue.c index 61331379..779156ce 100644 --- a/klippy/chelper/serialqueue.c +++ b/klippy/chelper/serialqueue.c @@ -1,9 +1,9 @@ // Serial port command queuing // -// Copyright (C) 2016-2018 Kevin O'Connor +// Copyright (C) 2016-2020 Kevin O'Connor // // This file may be distributed under the terms of the GNU GPLv3 license. -// + // This goal of this code is to handle low-level serial port // communications with a microcontroller (mcu). This code is written // in C (instead of python) to reduce communication latencies and to @@ -372,6 +372,7 @@ struct serialqueue { struct list_head pending_queues; int ready_bytes, stalled_bytes, need_ack_bytes, last_ack_bytes; uint64_t need_kick_clock; + struct list_head notify_queue; // Received messages struct list_head receive_queue; // Debugging @@ -512,6 +513,25 @@ handle_message(struct serialqueue *sq, double eventtime, int len) if (rseq != sq->receive_seq) // New sequence number update_receive_seq(sq, eventtime, rseq); + + // Check for pending messages on notify_queue + int must_wake = 0; + while (!list_empty(&sq->notify_queue)) { + struct queue_message *qm = list_first_entry( + &sq->notify_queue, struct queue_message, node); + uint64_t wake_seq = rseq - 1 - (len > MESSAGE_MIN ? 1 : 0); + uint64_t notify_msg_sent_seq = qm->req_clock; + if (notify_msg_sent_seq > wake_seq) + break; + list_del(&qm->node); + qm->len = 0; + qm->sent_time = sq->last_receive_sent_time; + qm->receive_time = eventtime; + list_add_tail(&qm->node, &sq->receive_queue); + must_wake = 1; + } + + // Process message if (len == MESSAGE_MIN) { // Ack/nak message if (sq->last_ack_seq < rseq) @@ -519,18 +539,19 @@ handle_message(struct serialqueue *sq, double eventtime, int len) else if (rseq > sq->ignore_nak_seq && !list_empty(&sq->sent_queue)) // Duplicate Ack is a Nak - do fast retransmit pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT, PR_NOW); - } - - if (len > MESSAGE_MIN) { - // Add message to receive queue + } else { + // Data message - add to receive queue struct queue_message *qm = message_fill(sq->input_buf, len); qm->sent_time = (rseq > sq->retransmit_seq ? sq->last_receive_sent_time : 0.); qm->receive_time = get_monotonic(); // must be time post read() qm->receive_time -= sq->baud_adjust * len; list_add_tail(&qm->node, &sq->receive_queue); - check_wake_receive(sq); + must_wake = 1; } + + if (must_wake) + check_wake_receive(sq); } // Callback for input activity on the serial fd @@ -661,7 +682,13 @@ build_and_send_command(struct serialqueue *sq, double eventtime) memcpy(&out->msg[out->len], qm->msg, qm->len); out->len += qm->len; sq->ready_bytes -= qm->len; - message_free(qm); + if (qm->notify_id) { + // Message requires notification - add to notify list + qm->req_clock = sq->send_seq; + list_add_tail(&qm->node, &sq->notify_queue); + } else { + message_free(qm); + } } // Fill header / trailer @@ -835,6 +862,7 @@ serialqueue_alloc(int serial_fd, int write_only) list_init(&sq->pending_queues); list_init(&sq->sent_queue); list_init(&sq->receive_queue); + list_init(&sq->notify_queue); // Debugging list_init(&sq->old_sent); @@ -882,6 +910,7 @@ serialqueue_free(struct serialqueue *sq) pthread_mutex_lock(&sq->lock); message_queue_free(&sq->sent_queue); message_queue_free(&sq->receive_queue); + message_queue_free(&sq->notify_queue); message_queue_free(&sq->old_sent); message_queue_free(&sq->old_receive); while (!list_empty(&sq->pending_queues)) { @@ -960,11 +989,13 @@ serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq // given time and priority. void __visible serialqueue_send(struct serialqueue *sq, struct command_queue *cq, uint8_t *msg - , int len, uint64_t min_clock, uint64_t req_clock) + , int len, uint64_t min_clock, uint64_t req_clock + , uint64_t notify_id) { struct queue_message *qm = message_fill(msg, len); qm->min_clock = min_clock; qm->req_clock = req_clock; + qm->notify_id = notify_id; struct list_head msgs; list_init(&msgs); @@ -998,7 +1029,11 @@ serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm) pqm->len = qm->len; pqm->sent_time = qm->sent_time; pqm->receive_time = qm->receive_time; - debug_queue_add(&sq->old_receive, qm); + pqm->notify_id = qm->notify_id; + if (qm->len) + debug_queue_add(&sq->old_receive, qm); + else + message_free(qm); pthread_mutex_unlock(&sq->lock); return; diff --git a/klippy/chelper/serialqueue.h b/klippy/chelper/serialqueue.h index 807e9883..071923e5 100644 --- a/klippy/chelper/serialqueue.h +++ b/klippy/chelper/serialqueue.h @@ -32,6 +32,7 @@ struct queue_message { double sent_time, receive_time; }; }; + uint64_t notify_id; struct list_node node; }; @@ -42,6 +43,7 @@ struct pull_queue_message { uint8_t msg[MESSAGE_MAX]; int len; double sent_time, receive_time; + uint64_t notify_id; }; struct serialqueue; @@ -53,8 +55,8 @@ void serialqueue_free_commandqueue(struct command_queue *cq); void serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq , struct list_head *msgs); void serialqueue_send(struct serialqueue *sq, struct command_queue *cq - , uint8_t *msg, int len - , uint64_t min_clock, uint64_t req_clock); + , uint8_t *msg, int len, uint64_t min_clock + , uint64_t req_clock, uint64_t notify_id); void serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm); void serialqueue_set_baud_adjust(struct serialqueue *sq, double baud_adjust); void serialqueue_set_clock_est(struct serialqueue *sq, double est_freq diff --git a/klippy/serialhdl.py b/klippy/serialhdl.py index 1954bb56..190fd745 100644 --- a/klippy/serialhdl.py +++ b/klippy/serialhdl.py @@ -1,6 +1,6 @@ # Serial port management for firmware communication # -# Copyright (C) 2016-2019 Kevin O'Connor +# Copyright (C) 2016-2020 Kevin O'Connor # # This file may be distributed under the terms of the GNU GPLv3 license. import logging, threading @@ -32,13 +32,22 @@ class SerialReader: self.handlers = {} self.register_response(self._handle_unknown_init, '#unknown') self.register_response(self.handle_output, '#output') + # Sent message notification tracking + self.last_notify_id = 0 + self.pending_notifications = {} def _bg_thread(self): response = self.ffi_main.new('struct pull_queue_message *') while 1: self.ffi_lib.serialqueue_pull(self.serialqueue, response) count = response.len - if count <= 0: + if count < 0: break + if response.notify_id: + params = {'#sent_time': response.sent_time, + '#receive_time': response.receive_time} + completion = self.pending_notifications.pop(response.notify_id) + self.reactor.async_complete(completion, params) + continue params = self.msgparser.parse(response.msg[0:count]) params['#sent_time'] = response.sent_time params['#receive_time'] = response.receive_time @@ -126,6 +135,9 @@ class SerialReader: if self.ser is not None: self.ser.close() self.ser = None + for pn in self.pending_notifications.values(): + pn.complete(None) + self.pending_notifications.clear() def stats(self, eventtime): if self.serialqueue is None: return "" @@ -145,8 +157,19 @@ class SerialReader: self.handlers[name, oid] = callback # Command sending def raw_send(self, cmd, minclock, reqclock, cmd_queue): - self.ffi_lib.serialqueue_send( - self.serialqueue, cmd_queue, cmd, len(cmd), minclock, reqclock) + self.ffi_lib.serialqueue_send(self.serialqueue, cmd_queue, + cmd, len(cmd), minclock, reqclock, 0) + def raw_send_wait_ack(self, cmd, minclock, reqclock, cmd_queue): + self.last_notify_id += 1 + nid = self.last_notify_id + completion = self.reactor.completion() + self.pending_notifications[nid] = completion + self.ffi_lib.serialqueue_send(self.serialqueue, cmd_queue, + cmd, len(cmd), minclock, reqclock, nid) + params = completion.wait() + if params is None: + raise error("Serial connection closed") + return params def send(self, msg, minclock=0, reqclock=0): cmd = self.msgparser.create_command(msg) self.raw_send(cmd, minclock, reqclock, self.default_cmd_queue)