paneldue: add annotations

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2021-05-15 11:08:52 -04:00
parent 1ada457364
commit 1026d59cad
1 changed files with 154 additions and 109 deletions

View File

@ -3,18 +3,39 @@
# Copyright (C) 2020 Eric Callahan <arksine.code@gmail.com> # Copyright (C) 2020 Eric Callahan <arksine.code@gmail.com>
# #
# This file may be distributed under the terms of the GNU GPLv3 license. # This file may be distributed under the terms of the GNU GPLv3 license.
from __future__ import annotations
import serial import serial
import os import os
import time import time
import json import json
import errno import errno
import logging import logging
import asyncio
from collections import deque from collections import deque
from utils import ServerError from utils import ServerError
from tornado import gen from tornado import gen
from tornado.ioloop import IOLoop from tornado.ioloop import IOLoop
# Annotation imports
from typing import (
TYPE_CHECKING,
Deque,
Any,
Tuple,
Optional,
Dict,
List,
Callable,
Coroutine,
)
if TYPE_CHECKING:
from confighelper import ConfigHelper
from . import klippy_apis
from . import file_manager
APIComp = klippy_apis.KlippyAPI
FMComp = file_manager.FileManager
FlexCallback = Callable[..., Optional[Coroutine]]
MIN_EST_TIME = 10. MIN_EST_TIME = 10.
INITIALIZE_TIMEOUT = 10. INITIALIZE_TIMEOUT = 10.
@ -25,26 +46,31 @@ class PanelDueError(ServerError):
RESTART_GCODES = ["RESTART", "FIRMWARE_RESTART"] RESTART_GCODES = ["RESTART", "FIRMWARE_RESTART"]
class SerialConnection: class SerialConnection:
def __init__(self, config, paneldue): def __init__(self,
config: ConfigHelper,
paneldue: PanelDue
) -> None:
self.ioloop = IOLoop.current() self.ioloop = IOLoop.current()
self.paneldue = paneldue self.paneldue = paneldue
self.port = config.get('serial') self.port: str = config.get('serial')
self.baud = config.getint('baud', 57600) self.baud = config.getint('baud', 57600)
self.partial_input = b"" self.partial_input: bytes = b""
self.ser = self.fd = None self.ser: Optional[serial.Serial] = None
self.connected = False self.fd: Optional[int] = None
self.send_busy = False self.connected: bool = False
self.send_buffer = b"" self.send_busy: bool = False
self.attempting_connect = True self.send_buffer: bytes = b""
self.attempting_connect: bool = True
self.ioloop.spawn_callback(self._connect) self.ioloop.spawn_callback(self._connect)
def disconnect(self, reconnect=False): def disconnect(self, reconnect: bool = False) -> None:
if self.connected: if self.connected:
if self.fd is not None: if self.fd is not None:
self.ioloop.remove_handler(self.fd) self.ioloop.remove_handler(self.fd)
self.fd = None self.fd = None
self.connected = False self.connected = False
self.ser.close() if self.ser is not None:
self.ser.close()
self.ser = None self.ser = None
self.partial_input = b"" self.partial_input = b""
self.send_buffer = b"" self.send_buffer = b""
@ -52,9 +78,9 @@ class SerialConnection:
logging.info("PanelDue Disconnected") logging.info("PanelDue Disconnected")
if reconnect and not self.attempting_connect: if reconnect and not self.attempting_connect:
self.attempting_connect = True self.attempting_connect = True
self.ioloop.call_later(1., self._connect) self.ioloop.call_later(1., self._connect) # type: ignore
async def _connect(self): async def _connect(self) -> None:
start_time = connect_time = time.time() start_time = connect_time = time.time()
while not self.connected: while not self.connected:
if connect_time > start_time + 30.: if connect_time > start_time + 30.:
@ -73,14 +99,15 @@ class SerialConnection:
connect_time += time.time() connect_time += time.time()
continue continue
self.fd = self.ser.fileno() self.fd = self.ser.fileno()
os.set_blocking(self.fd, False) fd = self.fd = self.ser.fileno()
self.ioloop.add_handler( os.set_blocking(fd, False)
self.fd, self._handle_incoming, IOLoop.READ | IOLoop.ERROR) self.ioloop.add_handler(fd, self._handle_incoming,
IOLoop.READ | IOLoop.ERROR)
self.connected = True self.connected = True
logging.info("PanelDue Connected") logging.info("PanelDue Connected")
self.attempting_connect = False self.attempting_connect = False
def _handle_incoming(self, fd, events): def _handle_incoming(self, fd: int, events: int) -> None:
if events & IOLoop.ERROR: if events & IOLoop.ERROR:
logging.info("PanelDue Connection Error") logging.info("PanelDue Connection Error")
self.disconnect(reconnect=True) self.disconnect(reconnect=True)
@ -104,23 +131,24 @@ class SerialConnection:
self.partial_input = lines.pop() self.partial_input = lines.pop()
for line in lines: for line in lines:
try: try:
line = line.strip().decode('utf-8', 'ignore') decoded_line = line.strip().decode('utf-8', 'ignore')
self.paneldue.process_line(line) self.paneldue.process_line(decoded_line)
except ServerError: except ServerError:
logging.exception( logging.exception(
f"GCode Processing Error: {line}") f"GCode Processing Error: {decoded_line}")
self.paneldue.handle_gcode_response( self.paneldue.handle_gcode_response(
f"!! GCode Processing Error: {line}") f"!! GCode Processing Error: {decoded_line}")
except Exception: except Exception:
logging.exception("Error during gcode processing") logging.exception("Error during gcode processing")
def send(self, data): def send(self, data: bytes) -> None:
self.send_buffer += data self.send_buffer += data
if not self.send_busy: if not self.send_busy:
self.send_busy = True self.send_busy = True
self.ioloop.spawn_callback(self._do_send) self.ioloop.spawn_callback(self._do_send)
async def _do_send(self): async def _do_send(self) -> None:
assert self.fd is not None
while self.send_buffer: while self.send_buffer:
if not self.connected: if not self.connected:
break break
@ -142,41 +170,44 @@ class SerialConnection:
self.send_busy = False self.send_busy = False
class PanelDue: class PanelDue:
def __init__(self, config): def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server() self.server = config.get_server()
self.ioloop = IOLoop.current() self.ioloop = IOLoop.current()
self.file_manager = self.server.lookup_component('file_manager') self.file_manager: FMComp = \
self.klippy_apis = self.server.lookup_component('klippy_apis') self.server.lookup_component('file_manager')
self.kinematics = "none" self.klippy_apis: APIComp = \
self.server.lookup_component('klippy_apis')
self.kinematics: str = "none"
self.machine_name = config.get('machine_name', "Klipper") self.machine_name = config.get('machine_name', "Klipper")
self.firmware_name = "Repetier | Klipper" self.firmware_name: str = "Repetier | Klipper"
self.last_message = None self.last_message: Optional[str] = None
self.last_gcode_response = None self.last_gcode_response: Optional[str] = None
self.current_file = "" self.current_file: str = ""
self.file_metadata = {} self.file_metadata: Dict[str, Any] = {}
self.enable_checksum = config.getboolean('enable_checksum', True) self.enable_checksum = config.getboolean('enable_checksum', True)
self.debug_queue = deque(maxlen=100) self.debug_queue: Deque[str] = deque(maxlen=100)
# Initialize tracked state. # Initialize tracked state.
self.printer_state = { self.printer_state: Dict[str, Dict[str, Any]] = {
'gcode_move': {}, 'toolhead': {}, 'virtual_sdcard': {}, 'gcode_move': {}, 'toolhead': {}, 'virtual_sdcard': {},
'fan': {}, 'display_status': {}, 'print_stats': {}, 'fan': {}, 'display_status': {}, 'print_stats': {},
'idle_timeout': {}, 'gcode_macro PANELDUE_BEEP': {}} 'idle_timeout': {}, 'gcode_macro PANELDUE_BEEP': {}}
self.extruder_count = 0 self.extruder_count: int = 0
self.heaters = [] self.heaters: List[str] = []
self.is_ready = False self.is_ready: bool = False
self.is_shutdown = False self.is_shutdown: bool = False
self.initialized = False self.initialized: bool = False
self.cq_busy = self.gq_busy = False self.cq_busy: bool = False
self.command_queue = [] self.gq_busy: bool = False
self.gc_queue = [] self.command_queue: List[Tuple[FlexCallback, Any, Any]] = []
self.last_printer_state = 'O' self.gc_queue: List[str] = []
self.last_update_time = 0. self.last_printer_state: str = 'O'
self.last_update_time: float = 0.
# Set up macros # Set up macros
self.confirmed_gcode = "" self.confirmed_gcode: str = ""
self.mbox_sequence = 0 self.mbox_sequence: int = 0
self.available_macros = {} self.available_macros: Dict[str, str] = {}
self.confirmed_macros = { self.confirmed_macros = {
"RESTART": "RESTART", "RESTART": "RESTART",
"FIRMWARE_RESTART": "FIRMWARE_RESTART"} "FIRMWARE_RESTART": "FIRMWARE_RESTART"}
@ -184,14 +215,14 @@ class PanelDue:
if macros is not None: if macros is not None:
# The macro's configuration name is the key, whereas the full # The macro's configuration name is the key, whereas the full
# command is the value # command is the value
macros = [m for m in macros.split('\n') if m.strip()] macro_list = [m for m in macros.split('\n') if m.strip()]
self.available_macros = {m.split()[0]: m for m in macros} self.available_macros = {m.split()[0]: m for m in macro_list}
conf_macros = config.get('confirmed_macros', None) conf_macros = config.get('confirmed_macros', None)
if conf_macros is not None: if conf_macros is not None:
# The macro's configuration name is the key, whereas the full # The macro's configuration name is the key, whereas the full
# command is the value # command is the value
conf_macros = [m for m in conf_macros.split('\n') if m.strip()] macro_list = [m for m in conf_macros.split('\n') if m.strip()]
self.confirmed_macros = {m.split()[0]: m for m in conf_macros} self.confirmed_macros = {m.split()[0]: m for m in macro_list}
self.available_macros.update(self.confirmed_macros) self.available_macros.update(self.confirmed_macros)
ntkeys = config.get('non_trivial_keys', "Klipper state") ntkeys = config.get('non_trivial_keys', "Klipper state")
@ -216,7 +247,7 @@ class PanelDue:
# These commands are directly executued on the server and do not to # These commands are directly executued on the server and do not to
# make a request to Klippy # make a request to Klippy
self.direct_gcodes = { self.direct_gcodes: Dict[str, FlexCallback] = {
'M20': self._run_paneldue_M20, 'M20': self._run_paneldue_M20,
'M30': self._run_paneldue_M30, 'M30': self._run_paneldue_M30,
'M36': self._run_paneldue_M36, 'M36': self._run_paneldue_M36,
@ -225,7 +256,7 @@ class PanelDue:
# These gcodes require special parsing or handling prior to being # These gcodes require special parsing or handling prior to being
# sent via Klippy's "gcode/script" api command. # sent via Klippy's "gcode/script" api command.
self.special_gcodes = { self.special_gcodes: Dict[str, Callable[[List[str]], str]] = {
'M0': lambda args: "CANCEL_PRINT", 'M0': lambda args: "CANCEL_PRINT",
'M23': self._prepare_M23, 'M23': self._prepare_M23,
'M24': lambda args: "RESUME", 'M24': lambda args: "RESUME",
@ -239,7 +270,7 @@ class PanelDue:
'M999': lambda args: "FIRMWARE_RESTART" 'M999': lambda args: "FIRMWARE_RESTART"
} }
async def _process_klippy_ready(self): async def _process_klippy_ready(self) -> None:
# Request "info" and "configfile" status # Request "info" and "configfile" status
retries = 10 retries = 10
printer_info = cfg_status = {} printer_info = cfg_status = {}
@ -259,8 +290,9 @@ class PanelDue:
self.firmware_name = "Repetier | Klipper " + \ self.firmware_name = "Repetier | Klipper " + \
printer_info['software_version'] printer_info['software_version']
config = cfg_status.get('configfile', {}).get('config', {}) config: Dict[str, Any] = cfg_status.get(
printer_cfg = config.get('printer', {}) 'configfile', {}).get('config', {})
printer_cfg: Dict[str, Any] = config.get('printer', {})
self.kinematics = printer_cfg.get('kinematics', "none") self.kinematics = printer_cfg.get('kinematics', "none")
logging.info( logging.info(
@ -288,6 +320,7 @@ class PanelDue:
self.heaters.append(cfg) self.heaters.append(cfg)
sub_args[cfg] = None sub_args[cfg] = None
try: try:
status: Dict[str, Any]
status = await self.klippy_apis.subscribe_objects(sub_args) status = await self.klippy_apis.subscribe_objects(sub_args)
except self.server.error: except self.server.error:
logging.exception("Unable to complete subscription request") logging.exception("Unable to complete subscription request")
@ -296,28 +329,28 @@ class PanelDue:
self.is_shutdown = False self.is_shutdown = False
self.is_ready = True self.is_ready = True
def _process_klippy_shutdown(self): def _process_klippy_shutdown(self) -> None:
self.is_shutdown = True self.is_shutdown = True
def _process_klippy_disconnect(self): def _process_klippy_disconnect(self) -> None:
# Tell the PD that the printer is "off" # Tell the PD that the printer is "off"
self.write_response({'status': 'O'}) self.write_response({'status': 'O'})
self.last_printer_state = 'O' self.last_printer_state = 'O'
self.is_shutdown = self.is_shutdown = False self.is_shutdown = self.is_shutdown = False
def handle_status_update(self, status): def handle_status_update(self, status: Dict[str, Any]) -> None:
for obj, items in status.items(): for obj, items in status.items():
if obj in self.printer_state: if obj in self.printer_state:
self.printer_state[obj].update(items) self.printer_state[obj].update(items)
else: else:
self.printer_state[obj] = items self.printer_state[obj] = items
def paneldue_beep(self, frequency, duration): def paneldue_beep(self, frequency: int, duration: float) -> None:
duration = int(duration * 1000.) duration = int(duration * 1000.)
self.write_response( self.write_response(
{'beep_freq': frequency, 'beep_length': duration}) {'beep_freq': frequency, 'beep_length': duration})
def process_line(self, line): def process_line(self, line: str) -> None:
self.debug_queue.append(line) self.debug_queue.append(line)
# If we find M112 in the line then skip verification # If we find M112 in the line then skip verification
if "M112" in line.upper(): if "M112" in line.upper():
@ -328,7 +361,7 @@ class PanelDue:
# Get line number # Get line number
line_index = line.find(' ') line_index = line.find(' ')
try: try:
line_no = int(line[1:line_index]) line_no: Optional[int] = int(line[1:line_index])
except Exception: except Exception:
line_index = -1 line_index = -1
line_no = None line_no = None
@ -370,7 +403,7 @@ class PanelDue:
# Check for commands that query state and require immediate response # Check for commands that query state and require immediate response
if cmd in self.direct_gcodes: if cmd in self.direct_gcodes:
params = {} params: Dict[str, Any] = {}
for p in parts[1:]: for p in parts[1:]:
if p[0] not in "PSR": if p[0] not in "PSR":
params["arg_p"] = p.strip(" \"\t\n") params["arg_p"] = p.strip(" \"\t\n")
@ -391,20 +424,20 @@ class PanelDue:
# Prepare GCodes that require special handling # Prepare GCodes that require special handling
if cmd in self.special_gcodes: if cmd in self.special_gcodes:
func = self.special_gcodes[cmd] sgc_func = self.special_gcodes[cmd]
script = func(parts[1:]) script = sgc_func(parts[1:])
if not script: if not script:
return return
self.queue_gcode(script) self.queue_gcode(script)
def queue_gcode(self, script): def queue_gcode(self, script: str) -> None:
self.gc_queue.append(script) self.gc_queue.append(script)
if not self.gq_busy: if not self.gq_busy:
self.gq_busy = True self.gq_busy = True
self.ioloop.spawn_callback(self._process_gcode_queue) self.ioloop.spawn_callback(self._process_gcode_queue)
async def _process_gcode_queue(self): async def _process_gcode_queue(self) -> None:
while self.gc_queue: while self.gc_queue:
script = self.gc_queue.pop(0) script = self.gc_queue.pop(0)
try: try:
@ -418,24 +451,24 @@ class PanelDue:
logging.exception(msg) logging.exception(msg)
self.gq_busy = False self.gq_busy = False
def queue_command(self, cmd, *args, **kwargs): def queue_command(self, cmd: FlexCallback, *args, **kwargs) -> None:
self.command_queue.append((cmd, args, kwargs)) self.command_queue.append((cmd, args, kwargs))
if not self.cq_busy: if not self.cq_busy:
self.cq_busy = True self.cq_busy = True
self.ioloop.spawn_callback(self._process_command_queue) self.ioloop.spawn_callback(self._process_command_queue)
async def _process_command_queue(self): async def _process_command_queue(self) -> None:
while self.command_queue: while self.command_queue:
cmd, args, kwargs = self.command_queue.pop(0) cmd, args, kwargs = self.command_queue.pop(0)
try: try:
ret = cmd(*args, **kwargs) ret = cmd(*args, **kwargs)
if asyncio.iscoroutine(ret): if ret is not None:
await ret await ret
except Exception: except Exception:
logging.exception("Error processing command") logging.exception("Error processing command")
self.cq_busy = False self.cq_busy = False
def _clean_filename(self, filename): def _clean_filename(self, filename: str) -> str:
# Remove quotes and whitespace # Remove quotes and whitespace
filename.strip(" \"\t\n") filename.strip(" \"\t\n")
# Remove drive number # Remove drive number
@ -453,17 +486,17 @@ class PanelDue:
filename = "/" + filename filename = "/" + filename
return filename return filename
def _prepare_M23(self, args): def _prepare_M23(self, args: List[str]) -> str:
filename = self._clean_filename(args[0]) filename = self._clean_filename(args[0])
return f"M23 {filename}" return f"M23 {filename}"
def _prepare_M32(self, args): def _prepare_M32(self, args: List[str]) -> str:
filename = self._clean_filename(args[0]) filename = self._clean_filename(args[0])
# Escape existing double quotes in the file name # Escape existing double quotes in the file name
filename = filename.replace("\"", "\\\"") filename = filename.replace("\"", "\\\"")
return f"SDCARD_PRINT_FILE FILENAME=\"{filename}\"" return f"SDCARD_PRINT_FILE FILENAME=\"{filename}\""
def _prepare_M98(self, args): def _prepare_M98(self, args: List[str]) -> str:
macro = args[0][1:].strip(" \"\t\n") macro = args[0][1:].strip(" \"\t\n")
name_start = macro.rfind('/') + 1 name_start = macro.rfind('/') + 1
macro = macro[name_start:] macro = macro[name_start:]
@ -475,12 +508,12 @@ class PanelDue:
cmd = "" cmd = ""
return cmd return cmd
def _prepare_M290(self, args): def _prepare_M290(self, args: List[str]) -> str:
# args should in in the format Z0.02 # args should in in the format Z0.02
offset = args[0][1:].strip() offset = args[0][1:].strip()
return f"SET_GCODE_OFFSET Z_ADJUST={offset} MOVE=1" return f"SET_GCODE_OFFSET Z_ADJUST={offset} MOVE=1"
def _prepare_M292(self, args): def _prepare_M292(self, args: List[str]) -> str:
p_val = int(args[0][1]) p_val = int(args[0][1])
if p_val == 0: if p_val == 0:
cmd = self.confirmed_gcode cmd = self.confirmed_gcode
@ -488,13 +521,13 @@ class PanelDue:
return cmd return cmd
return "" return ""
def _create_confirmation(self, name, gcode): def _create_confirmation(self, name: str, gcode: str) -> None:
self.mbox_sequence += 1 self.mbox_sequence += 1
self.confirmed_gcode = gcode self.confirmed_gcode = gcode
title = "Confirmation Dialog" title = "Confirmation Dialog"
msg = f"Please confirm your intent to run {name}." \ msg = f"Please confirm your intent to run {name}." \
" Press OK to continue, or CANCEL to abort." " Press OK to continue, or CANCEL to abort."
mbox = {} mbox: Dict[str, Any] = {}
mbox['msgBox.mode'] = 3 mbox['msgBox.mode'] = 3
mbox['msgBox.msg'] = msg mbox['msgBox.msg'] = msg
mbox['msgBox.seq'] = self.mbox_sequence mbox['msgBox.seq'] = self.mbox_sequence
@ -504,7 +537,7 @@ class PanelDue:
logging.debug(f"Creating PanelDue Confirmation: {mbox}") logging.debug(f"Creating PanelDue Confirmation: {mbox}")
self.write_response(mbox) self.write_response(mbox)
def handle_gcode_response(self, response): def handle_gcode_response(self, response: str) -> None:
# Only queue up "non-trivial" gcode responses. At the # Only queue up "non-trivial" gcode responses. At the
# moment we'll handle state changes and errors # moment we'll handle state changes and errors
if "Klipper state" in response \ if "Klipper state" in response \
@ -516,11 +549,11 @@ class PanelDue:
self.last_gcode_response = response self.last_gcode_response = response
return return
def write_response(self, response): def write_response(self, response: Dict[str, Any]) -> None:
byte_resp = json.dumps(response) + "\r\n" byte_resp = json.dumps(response) + "\r\n"
self.ser_conn.send(byte_resp.encode()) self.ser_conn.send(byte_resp.encode())
def _get_printer_status(self): def _get_printer_status(self) -> str:
# PanelDue States applicable to Klipper: # PanelDue States applicable to Klipper:
# I = idle, P = printing from SD, S = stopped (shutdown), # I = idle, P = printing from SD, S = stopped (shutdown),
# C = starting up (not ready), A = paused, D = pausing, # C = starting up (not ready), A = paused, D = pausing,
@ -529,6 +562,7 @@ class PanelDue:
return 'S' return 'S'
printer_state = self.printer_state printer_state = self.printer_state
sd_state: str
sd_state = printer_state['print_stats'].get('state', "standby") sd_state = printer_state['print_stats'].get('state', "standby")
if sd_state == "printing": if sd_state == "printing":
if self.last_printer_state == 'A': if self.last_printer_state == 'A':
@ -548,8 +582,11 @@ class PanelDue:
return 'I' return 'I'
def _run_paneldue_M408(self, arg_r=None, arg_s=1): def _run_paneldue_M408(self,
response = {} arg_r: Optional[int] = None,
arg_s: int = 1
) -> None:
response: Dict[str, Any] = {}
sequence = arg_r sequence = arg_r
response_type = arg_s response_type = arg_s
@ -587,6 +624,9 @@ class PanelDue:
'homing_origin', [0., 0., 0., 0.])[2], 3) 'homing_origin', [0., 0., 0., 0.])[2], 3)
# Current position # Current position
pos: List[float]
homed_pos: str
sfactor: float
pos = p_state['toolhead'].get('position', [0., 0., 0., 0.]) pos = p_state['toolhead'].get('position', [0., 0., 0., 0.])
response['pos'] = [round(p, 2) for p in pos[:3]] response['pos'] = [round(p, 2) for p in pos[:3]]
homed_pos = p_state['toolhead'].get('homed_axes', "") homed_pos = p_state['toolhead'].get('homed_axes', "")
@ -597,38 +637,41 @@ class PanelDue:
# Print Progress Tracking # Print Progress Tracking
sd_status = p_state['virtual_sdcard'] sd_status = p_state['virtual_sdcard']
print_stats = p_state['print_stats'] print_stats = p_state['print_stats']
fname = print_stats.get('filename', "") fname: str = print_stats.get('filename', "")
sd_print_state = print_stats.get('state') sd_print_state: Optional[str] = print_stats.get('state')
if sd_print_state in ['printing', 'paused']: if sd_print_state in ['printing', 'paused']:
# We know a file has been loaded, initialize metadata # We know a file has been loaded, initialize metadata
if self.current_file != fname: if self.current_file != fname:
self.current_file = fname self.current_file = fname
self.file_metadata = self.file_manager.get_file_metadata(fname) self.file_metadata = self.file_manager.get_file_metadata(fname)
progress = sd_status.get('progress', 0) progress: float = sd_status.get('progress', 0)
# progress and print tracking # progress and print tracking
if progress: if progress:
response['fraction_printed'] = round(progress, 3) response['fraction_printed'] = round(progress, 3)
est_time = self.file_metadata.get('estimated_time', 0) est_time: float = self.file_metadata.get('estimated_time', 0)
if est_time > MIN_EST_TIME: if est_time > MIN_EST_TIME:
# file read estimate # file read estimate
times_left = [int(est_time - est_time * progress)] times_left = [int(est_time - est_time * progress)]
# filament estimate # filament estimate
est_total_fil: Optional[float]
est_total_fil = self.file_metadata.get('filament_total') est_total_fil = self.file_metadata.get('filament_total')
if est_total_fil: if est_total_fil:
cur_filament = print_stats.get('filament_used', 0.) cur_filament: float = print_stats.get(
'filament_used', 0.)
fpct = min(1., cur_filament / est_total_fil) fpct = min(1., cur_filament / est_total_fil)
times_left.append(int(est_time - est_time * fpct)) times_left.append(int(est_time - est_time * fpct))
# object height estimate # object height estimate
obj_height: Optional[float]
obj_height = self.file_metadata.get('object_height') obj_height = self.file_metadata.get('object_height')
if obj_height: if obj_height:
cur_height = p_state['gcode_move'].get( cur_height: float = p_state['gcode_move'].get(
'gcode_position', [0., 0., 0., 0.])[2] 'gcode_position', [0., 0., 0., 0.])[2]
hpct = min(1., cur_height / obj_height) hpct = min(1., cur_height / obj_height)
times_left.append(int(est_time - est_time * hpct)) times_left.append(int(est_time - est_time * hpct))
else: else:
# The estimated time is not in the metadata, however we # The estimated time is not in the metadata, however we
# can still provide an estimate based on file progress # can still provide an estimate based on file progress
duration = print_stats.get('print_duration', 0.) duration: float = print_stats.get('print_duration', 0.)
times_left = [int(duration / progress - duration)] times_left = [int(duration / progress - duration)]
response['timesLeft'] = times_left response['timesLeft'] = times_left
else: else:
@ -636,11 +679,12 @@ class PanelDue:
self.current_file = "" self.current_file = ""
self.file_metadata = {} self.file_metadata = {}
fan_speed = p_state['fan'].get('speed') fan_speed: Optional[float] = p_state['fan'].get('speed')
if fan_speed is not None: if fan_speed is not None:
response['fanPercent'] = [round(fan_speed * 100, 1)] response['fanPercent'] = [round(fan_speed * 100, 1)]
if self.extruder_count > 0: if self.extruder_count > 0:
extruder_name: Optional[str]
extruder_name = p_state['toolhead'].get('extruder') extruder_name = p_state['toolhead'].get('extruder')
if extruder_name is not None: if extruder_name is not None:
tool = 0 tool = 0
@ -649,12 +693,12 @@ class PanelDue:
response['tool'] = tool response['tool'] = tool
# Report Heater Status # Report Heater Status
efactor = round(p_state['gcode_move'].get( efactor: float = round(p_state['gcode_move'].get(
'extrude_factor', 1.) * 100., 2) 'extrude_factor', 1.) * 100., 2)
for name in self.heaters: for name in self.heaters:
temp = round(p_state[name].get('temperature', 0.0), 1) temp: float = round(p_state[name].get('temperature', 0.0), 1)
target = round(p_state[name].get('target', 0.0), 1) target: float = round(p_state[name].get('target', 0.0), 1)
response.setdefault('heaters', []).append(temp) response.setdefault('heaters', []).append(temp)
response.setdefault('active', []).append(target) response.setdefault('active', []).append(target)
response.setdefault('standby', []).append(target) response.setdefault('standby', []).append(target)
@ -664,7 +708,7 @@ class PanelDue:
response.setdefault('extr', []).append(round(pos[3], 2)) response.setdefault('extr', []).append(round(pos[3], 2))
# Display message (via M117) # Display message (via M117)
msg = p_state['display_status'].get('message') msg: str = p_state['display_status'].get('message', "")
if msg and msg != self.last_message: if msg and msg != self.last_message:
response['message'] = msg response['message'] = msg
# reset the message so it only shows once. The paneldue # reset the message so it only shows once. The paneldue
@ -673,7 +717,7 @@ class PanelDue:
self.last_message = msg self.last_message = msg
self.write_response(response) self.write_response(response)
def _run_paneldue_M20(self, arg_p, arg_s=0): def _run_paneldue_M20(self, arg_p: str, arg_s: int = 0) -> None:
response_type = arg_s response_type = arg_s
if response_type != 2: if response_type != 2:
logging.info( logging.info(
@ -689,7 +733,7 @@ class PanelDue:
# ie. "0:/" # ie. "0:/"
if path.startswith("0:/"): if path.startswith("0:/"):
path = path[2:] path = path[2:]
response = {'dir': path} response: Dict[str, Any] = {'dir': path}
response['files'] = [] response['files'] = []
if path == "/macros": if path == "/macros":
@ -713,7 +757,7 @@ class PanelDue:
response['files'] = flist response['files'] = flist
self.write_response(response) self.write_response(response)
async def _run_paneldue_M30(self, arg_p=None): async def _run_paneldue_M30(self, arg_p: str = "") -> None:
# Delete a file. Clean up the file name and make sure # Delete a file. Clean up the file name and make sure
# it is relative to the "gcodes" root. # it is relative to the "gcodes" root.
path = arg_p path = arg_p
@ -727,9 +771,9 @@ class PanelDue:
path = "gcodes/" + path path = "gcodes/" + path
await self.file_manager.delete_file(path) await self.file_manager.delete_file(path)
def _run_paneldue_M36(self, arg_p=None): def _run_paneldue_M36(self, arg_p: Optional[str] = None) -> None:
response = {} response: Dict[str, Any] = {}
filename = arg_p filename: Optional[str] = arg_p
sd_status = self.printer_state.get('virtual_sdcard', {}) sd_status = self.printer_state.get('virtual_sdcard', {})
print_stats = self.printer_state.get('print_stats', {}) print_stats = self.printer_state.get('print_stats', {})
if filename is None: if filename is None:
@ -756,37 +800,38 @@ class PanelDue:
if not filename.startswith("gcodes/"): if not filename.startswith("gcodes/"):
filename = "gcodes/" + filename filename = "gcodes/" + filename
metadata = self.file_manager.get_file_metadata(filename) metadata: Dict[str, Any] = \
self.file_manager.get_file_metadata(filename)
if metadata: if metadata:
response['err'] = 0 response['err'] = 0
response['size'] = metadata['size'] response['size'] = metadata['size']
# workaround for PanelDue replacing the first "T" found # workaround for PanelDue replacing the first "T" found
response['lastModified'] = "T" + time.ctime(metadata['modified']) response['lastModified'] = "T" + time.ctime(metadata['modified'])
slicer = metadata.get('slicer') slicer: Optional[str] = metadata.get('slicer')
if slicer is not None: if slicer is not None:
response['generatedBy'] = slicer response['generatedBy'] = slicer
height = metadata.get('object_height') height: Optional[float] = metadata.get('object_height')
if height is not None: if height is not None:
response['height'] = round(height, 2) response['height'] = round(height, 2)
layer_height = metadata.get('layer_height') layer_height: Optional[float] = metadata.get('layer_height')
if layer_height is not None: if layer_height is not None:
response['layerHeight'] = round(layer_height, 2) response['layerHeight'] = round(layer_height, 2)
filament = metadata.get('filament_total') filament: Optional[float] = metadata.get('filament_total')
if filament is not None: if filament is not None:
response['filament'] = [round(filament, 1)] response['filament'] = [round(filament, 1)]
est_time = metadata.get('estimated_time') est_time: Optional[float] = metadata.get('estimated_time')
if est_time is not None: if est_time is not None:
response['printTime'] = int(est_time + .5) response['printTime'] = int(est_time + .5)
else: else:
response['err'] = 1 response['err'] = 1
self.write_response(response) self.write_response(response)
def close(self): def close(self) -> None:
self.ser_conn.disconnect() self.ser_conn.disconnect()
msg = "\nPanelDue GCode Dump:" msg = "\nPanelDue GCode Dump:"
for i, gc in enumerate(self.debug_queue): for i, gc in enumerate(self.debug_queue):
msg += f"\nSequence {i}: {gc}" msg += f"\nSequence {i}: {gc}"
logging.debug(msg) logging.debug(msg)
def load_component(config): def load_component(config: ConfigHelper) -> PanelDue:
return PanelDue(config) return PanelDue(config)