From f6864da58bfd5c1c1da4274a01b49597a203cf98 Mon Sep 17 00:00:00 2001 From: Kevin O'Connor Date: Mon, 10 Jun 2019 19:51:43 -0400 Subject: [PATCH] tmc2208: Add support for using an analog mux to access TMC uarts Signed-off-by: Kevin O'Connor --- klippy/extras/tmc2208.py | 110 ++++++++++++++++++++++++++++++++------- klippy/pins.py | 29 ++++++----- 2 files changed, 109 insertions(+), 30 deletions(-) diff --git a/klippy/extras/tmc2208.py b/klippy/extras/tmc2208.py index c0d0d718..8f68b576 100644 --- a/klippy/extras/tmc2208.py +++ b/klippy/extras/tmc2208.py @@ -178,28 +178,66 @@ FieldFormatters.update({ }) +###################################################################### +# TMC uart analog mux support +###################################################################### + +class MCU_analog_mux: + def __init__(self, mcu, cmd_queue, select_pins_desc): + self.mcu = mcu + self.cmd_queue = cmd_queue + ppins = mcu.get_printer().lookup_object("pins") + select_pin_params = [ppins.lookup_pin(spd, can_invert=True) + for spd in select_pins_desc.split(',')] + self.oids = [self.mcu.create_oid() for pp in select_pin_params] + self.pins = [pp['pin'] for pp in select_pin_params] + self.pin_values = tuple([0 for pp in select_pin_params]) + for oid, pin, value in zip(self.oids, self.pins, self.pin_values): + self.mcu.add_config_cmd("config_digital_out oid=%d pin=%s" + " value=%d default_value=0 max_duration=0" + % (oid, pin, value)) + self.update_pin_cmd = None + self.mcu.register_config_callback(self.build_config) + def build_config(self): + self.update_pin_cmd = self.mcu.lookup_command( + "update_digital_out oid=%c value=%c", cq=self.cmd_queue) + def get_instance_id(self, select_pins_desc): + ppins = self.mcu.get_printer().lookup_object("pins") + select_pin_params = [ppins.parse_pin(spd, can_invert=True) + for spd in select_pins_desc.split(',')] + for pin_params in select_pin_params: + if pin_params['chip'] != self.mcu: + raise self.mcu.get_printer().config_error( + "TMC mux pins must be on the same mcu") + pins = [pp['pin'] for pp in select_pin_params] + if pins != self.pins: + raise self.mcu.get_printer().config_error( + "All TMC mux instances must use identical pins") + return tuple([not pp['invert'] for pp in select_pin_params]) + def activate(self, instance_id): + for oid, old, new in zip(self.oids, self.pin_values, instance_id): + if old != new: + self.update_pin_cmd.send([oid, new]) + self.pin_values = instance_id + + ###################################################################### # TMC2208 uart communication ###################################################################### # Code for sending messages on a TMC uart class MCU_TMC_uart_bitbang: - def __init__(self, config): - ppins = config.get_printer().lookup_object("pins") - rx_pin_params = ppins.lookup_pin( - config.get('uart_pin'), can_pullup=True) - tx_pin_desc = config.get('tx_pin', None) - if tx_pin_desc is None: - tx_pin_params = rx_pin_params - else: - tx_pin_params = ppins.lookup_pin(tx_pin_desc) - if rx_pin_params['chip'] is not tx_pin_params['chip']: - raise ppins.error("TMC uart rx and tx pins must be on the same mcu") + def __init__(self, rx_pin_params, tx_pin_params, select_pins_desc): self.mcu = rx_pin_params['chip'] self.pullup = rx_pin_params['pullup'] self.rx_pin = rx_pin_params['pin'] self.tx_pin = tx_pin_params['pin'] self.oid = self.mcu.create_oid() + self.cmd_queue = self.mcu.alloc_command_queue() + self.analog_mux = None + if select_pins_desc is not None: + self.analog_mux = MCU_analog_mux(self.mcu, self.cmd_queue, + select_pins_desc) self.tmcuart_send_cmd = None self.mcu.register_config_callback(self.build_config) def build_config(self): @@ -207,9 +245,18 @@ class MCU_TMC_uart_bitbang: self.mcu.add_config_cmd( "config_tmcuart oid=%d rx_pin=%s pull_up=%d tx_pin=%s bit_time=%d" % (self.oid, self.rx_pin, self.pullup, self.tx_pin, bit_ticks)) - cmd_queue = self.mcu.alloc_command_queue() self.tmcuart_send_cmd = self.mcu.lookup_command( - "tmcuart_send oid=%c write=%*s read=%c", cq=cmd_queue) + "tmcuart_send oid=%c write=%*s read=%c", cq=self.cmd_queue) + def register_instance(self, rx_pin_params, tx_pin_params, select_pins_desc): + if (rx_pin_params['pin'] != self.rx_pin + or tx_pin_params['pin'] != self.tx_pin + or (select_pins_desc is None) != (self.analog_mux is None)): + raise self.mcu.get_printer().config_error( + "Shared TMC uart must use identical pins") + instance_id = None + if self.analog_mux is not None: + instance_id = self.analog_mux.get_instance_id(select_pins_desc) + return instance_id def _calc_crc8(self, data): # Generate a CRC8-ATM value for a bytearray crc = 0 @@ -262,16 +309,42 @@ class MCU_TMC_uart_bitbang: if data != encoded_data: return None return val - def reg_read(self, addr, reg): + def reg_read(self, instance_id, addr, reg): + if self.analog_mux is not None: + self.analog_mux.activate(instance_id) msg = self._encode_read(0xf5, addr, reg) params = self.tmcuart_send_cmd.send_with_response( [self.oid, msg, 10], 'tmcuart_response', self.oid) return self._decode_read(reg, params['read']) - def reg_write(self, addr, reg, val): + def reg_write(self, instance_id, addr, reg, val): + if self.analog_mux is not None: + self.analog_mux.activate(instance_id) msg = self._encode_write(0xf5, 0x00, reg | 0x80, val) self.tmcuart_send_cmd.send_with_response( [self.oid, msg, 0], 'tmcuart_response', self.oid) +# Lookup a (possibly shared) tmc uart +def lookup_tmc_uart_bitbang(config): + ppins = config.get_printer().lookup_object("pins") + rx_pin_params = ppins.lookup_pin( + config.get('uart_pin'), can_pullup=True, share_type="tmc_uart_rx") + tx_pin_desc = config.get('tx_pin', None) + if tx_pin_desc is None: + tx_pin_params = rx_pin_params + else: + tx_pin_params = ppins.lookup_pin(tx_pin_desc, share_type="tmc_uart_tx") + if rx_pin_params['chip'] is not tx_pin_params['chip']: + raise ppins.error("TMC uart rx and tx pins must be on the same mcu") + select_pins_desc = config.get('select_pins', None) + mcu_uart = rx_pin_params.get('class') + if mcu_uart is None: + mcu_uart = MCU_TMC_uart_bitbang(rx_pin_params, tx_pin_params, + select_pins_desc) + rx_pin_params['class'] = mcu_uart + instance_id = mcu_uart.register_instance(rx_pin_params, tx_pin_params, + select_pins_desc) + return instance_id, mcu_uart + # Helper code for communicating via TMC uart class MCU_TMC_uart: def __init__(self, config, name_to_reg, fields): @@ -280,7 +353,8 @@ class MCU_TMC_uart: self.name_to_reg = name_to_reg self.fields = fields self.ifcnt = None - self.mcu_uart = MCU_TMC_uart_bitbang(config) + self.addr = 0 + self.instance_id, self.mcu_uart = lookup_tmc_uart_bitbang(config) def get_fields(self): return self.fields def get_register(self, reg_name): @@ -288,7 +362,7 @@ class MCU_TMC_uart: if self.printer.get_start_args().get('debugoutput') is not None: return 0 for retry in range(5): - val = self.mcu_uart.reg_read(0x00, reg) + val = self.mcu_uart.reg_read(self.instance_id, self.addr, reg) if val is not None: return val raise self.printer.command_error( @@ -301,7 +375,7 @@ class MCU_TMC_uart: ifcnt = self.ifcnt if ifcnt is None: self.ifcnt = ifcnt = self.get_register("IFCNT") - self.mcu_uart.reg_write(0x00, reg, val) + self.mcu_uart.reg_write(self.instance_id, self.addr, reg, val) self.ifcnt = self.get_register("IFCNT") if self.ifcnt == (ifcnt + 1) & 0xff: return diff --git a/klippy/pins.py b/klippy/pins.py index b1c8810e..4dfa5765 100644 --- a/klippy/pins.py +++ b/klippy/pins.py @@ -167,8 +167,7 @@ class PrinterPins: self.chips = {} self.active_pins = {} self.reserved_pins = {} - def lookup_pin(self, pin_desc, can_invert=False, can_pullup=False, - share_type=None): + def parse_pin(self, pin_desc, can_invert=False, can_pullup=False): desc = pin_desc.strip() pullup = invert = 0 if can_pullup and (desc.startswith('^') or desc.startswith('~')): @@ -194,17 +193,23 @@ class PrinterPins: raise error("Invalid pin description '%s'\n" "Format is: %s[chip_name:] pin_name" % ( pin_desc, format)) - share_name = "%s:%s" % (chip_name, pin) - if share_name in self.active_pins: - pin_params = self.active_pins[share_name] - if share_type is None or share_type != pin_params['share_type']: - raise error("pin %s used multiple times in config" % (pin,)) - if invert != pin_params['invert'] or pullup != pin_params['pullup']: - raise error("Shared pin %s must have same polarity" % (pin,)) - return pin_params pin_params = {'chip': self.chips[chip_name], 'chip_name': chip_name, - 'pin': pin, 'share_type': share_type, - 'invert': invert, 'pullup': pullup} + 'pin': pin, 'invert': invert, 'pullup': pullup} + return pin_params + def lookup_pin(self, pin_desc, can_invert=False, can_pullup=False, + share_type=None): + pin_params = self.parse_pin(pin_desc, can_invert, can_pullup) + pin = pin_params['pin'] + share_name = "%s:%s" % (pin_params['chip_name'], pin) + if share_name in self.active_pins: + share_params = self.active_pins[share_name] + if share_type is None or share_type != share_params['share_type']: + raise error("pin %s used multiple times in config" % (pin,)) + if (pin_params['invert'] != share_params['invert'] + or pin_params['pullup'] != share_params['pullup']): + raise error("Shared pin %s must have same polarity" % (pin,)) + return share_params + pin_params['share_type'] = share_type self.active_pins[share_name] = pin_params return pin_params def setup_pin(self, pin_type, pin_desc):