moonraker: replace legacy string interpolation with f-strings

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2020-08-11 12:59:47 -04:00
parent de1a79a7ca
commit 7cd22804dd
10 changed files with 86 additions and 91 deletions

View File

@ -77,7 +77,7 @@ class MutableRouter(tornado.web.ReversibleRuleRouter):
try:
self.rules.remove(rule)
except Exception:
logging.exception("Unable to remove rule: %s" % (pattern))
logging.exception(f"Unable to remove rule: {pattern}")
class APIDefinition:
def __init__(self, endpoint, http_uri, ws_method,
@ -149,8 +149,9 @@ class MoonrakerApp:
if api_def.uri in self.registered_base_handlers:
# reserved handler or already registered
return
logging.info("Registering remote endpoint: (%s) %s" % (
" ".join(api_def.request_methods), api_def.uri))
logging.info(
f"Registering remote endpoint: "
f"({' '.join(api_def.request_methods)}) {api_def.uri}")
self.wsm.register_handler(api_def)
params = {}
params['server'] = self.server
@ -168,8 +169,9 @@ class MoonrakerApp:
return
api_def = self._create_api_definition(
uri, ws_method, request_methods)
logging.info("Registering local endpoint: (%s) %s" % (
" ".join(request_methods), uri))
logging.info(
f"Registering local endpoint: "
f"({' '.join(request_methods)}) {uri}")
if not http_only:
self.wsm.register_handler(api_def, callback)
params = {}
@ -192,10 +194,9 @@ class MoonrakerApp:
pattern += "/"
pattern += "(.*)"
else:
logging.info("Invalid file path: %s" % (file_path))
logging.info(f"Invalid file path: {file_path}")
return
logging.debug("Registering static file: (%s) %s" % (
pattern, file_path))
logging.debug(f"Registering static file: ({pattern}) {file_path}")
methods = ['GET']
if can_delete:
methods.append('DELETE')
@ -318,7 +319,7 @@ class FileRequestHandler(AuthorizedFileHandler):
# a file
basename = os.path.basename(self.absolute_path)
self.set_header(
"Content-Disposition", "attachment; filename=%s" % (basename))
"Content-Disposition", f"attachment; filename={basename}")
async def delete(self, path):
if 'DELETE' not in self.methods:

View File

@ -44,7 +44,7 @@ class Authorization:
tc = ipaddress.ip_network(ip)
except ValueError:
raise ServerError(
"Invalid option in trusted_clients: %s" % (ip))
f"Invalid option in trusted_clients: {ip}")
self.trusted_ranges.append(tc)
else:
self.trusted_ips.append(tc)
@ -86,8 +86,8 @@ class Authorization:
# API Key file doesn't exist. Generate
# a new api key and create the file.
logging.info(
"No API Key file found, creating new one at:\n%s"
% (self.api_key_file))
f"No API Key file found, creating new one at:"
f"\n{self.api_key_file}")
return self._create_api_key()
def _create_api_key(self):
@ -113,7 +113,7 @@ class Authorization:
for ip in expired_conns:
self.trusted_connections.pop(ip, None)
logging.info(
"Trusted Connection Expired, IP: %s" % (ip))
f"Trusted Connection Expired, IP: {ip}")
def _token_expire_handler(self, token):
self.access_tokens.pop(token, None)
@ -135,7 +135,7 @@ class Authorization:
return True
elif self._check_authorized_ip(ip):
logging.info(
"Trusted Connection Detected, IP: %s" % (ip))
f"Trusted Connection Detected, IP: {ip}")
self.trusted_connections[ip] = time.time()
return True
return False
@ -158,7 +158,7 @@ class Authorization:
ip = ipaddress.ip_address(request.remote_ip)
except ValueError:
logging.exception(
"Unable to Create IP Address %s" % (request.remote_ip))
f"Unable to Create IP Address {request.remote_ip}")
ip = None
if self._check_trusted_connection(ip):
return True

View File

@ -98,7 +98,7 @@ class Server:
mod_path = os.path.join(
os.path.dirname(__file__), 'plugins', plugin_name + '.py')
if not os.path.exists(mod_path):
msg = "Plugin (%s) does not exist" % (plugin_name)
msg = f"Plugin ({plugin_name}) does not exist"
logging.info(msg)
if default == Sentinel:
raise ServerError(msg)
@ -108,19 +108,19 @@ class Server:
load_func = getattr(module, "load_plugin")
plugin = load_func(config)
except Exception:
msg = "Unable to load plugin (%s)" % (plugin_name)
msg = f"Unable to load plugin ({plugin_name})"
logging.info(msg)
if default == Sentinel:
raise ServerError(msg)
return default
self.plugins[plugin_name] = plugin
logging.info("Plugin (%s) loaded" % (plugin_name))
logging.info(f"Plugin ({plugin_name}) loaded")
return plugin
def lookup_plugin(self, plugin_name, default=Sentinel):
plugin = self.plugins.get(plugin_name, default)
if plugin == Sentinel:
raise ServerError("Plugin (%s) not found" % (plugin_name))
raise ServerError(f"Plugin ({plugin_name}) not found")
return plugin
def register_event_handler(self, event, callback):
@ -134,8 +134,7 @@ class Server:
def register_remote_method(self, method_name, cb):
if method_name in self.remote_methods:
# XXX - may want to raise an exception here
logging.info("Remote method (%s) already registered"
% (method_name))
logging.info(f"Remote method ({method_name}) already registered")
return
self.remote_methods[method_name] = cb
@ -180,11 +179,10 @@ class Server:
if cb is not None:
cb(**params)
else:
logging.info("Unknown command received %s" % data.decode())
logging.info(f"Unknown command received: {data.decode()}")
except Exception:
logging.exception(
"Error processing Klippy Host Response: %s"
% (data.decode()))
f"Error processing Klippy Host Response: {data.decode()}")
def _handle_stream_closed(self):
self.is_klippy_ready = False
@ -247,7 +245,7 @@ class Server:
f"to printer.cfg for full Moonraker functionality.")
else:
logging.info(
"%s\nUnable to retreive Klipper Object List " % (str(result)))
f"{result}\nUnable to retreive Klipper Object List")
async def _check_ready(self):
request = self.make_request("info", "GET", {})
@ -261,9 +259,9 @@ class Server:
logging.info("\n" + msg)
else:
logging.info(
"%s\nKlippy info request error. This indicates a that Klippy\n"
"may have experienced an error during startup. Please check\n "
"klippy.log for more information" % (str(result)))
f"{result}\nKlippy info request error. This indicates that\n"
f"Klippy may have experienced an error during startup.\n"
f"Please check klippy.log for more information")
def _handle_klippy_response(self, request_id, response):
req = self.pending_requests.pop(request_id, None)
@ -383,8 +381,8 @@ def main():
file_hdlr.setFormatter(formatter)
if sys.version_info < (3, 7):
msg = "Moonraker requires Python 3.7 or above. Detected Version: %s" \
% (sys.version)
msg = f"Moonraker requires Python 3.7 or above. " \
f"Detected Version: {sys.version}"
logging.info(msg)
print(msg)
exit(1)

View File

@ -62,8 +62,8 @@ class FileManager:
# No change in mutable paths
return
self.mutable_path_args = dict(paths)
str_paths = "\n".join(["%s: %s" % (k, v) for k, v in paths.items()])
logging.debug("\nUpdating Mutable Paths:\n%s" % (str_paths))
str_paths = "\n".join([f"{k}: {v}" for k, v in paths.items()])
logging.debug(f"\nUpdating Mutable Paths:\n{str_paths}")
# Register directories
sd = paths.pop('sd_path', None)
@ -99,7 +99,7 @@ class FileManager:
self._update_file_list(base=base)
except Exception:
logging.exception(
"Unable to initialize file list: <%s>" % (base))
f"Unable to initialize file list: <{base}>")
return True
def get_sd_directory(self):
@ -117,7 +117,7 @@ class FileManager:
metadata = self.gcode_metadata.get(requested_file)
if metadata is None:
raise self.server.error(
"Metadata not available for <%s>" % (requested_file), 404)
f"Metadata not available for <{requested_file}>", 404)
metadata['filename'] = requested_file
return metadata
@ -142,7 +142,7 @@ class FileManager:
"Cannot delete root directory")
if not os.path.isdir(dir_path):
raise self.server.error(
"Directory does not exist (%s)" % (directory))
f"Directory does not exist ({directory})")
force = args.get('force', False)
if isinstance(force, str):
force = force.lower() == "true"
@ -186,10 +186,10 @@ class FileManager:
def _convert_path(self, url_path):
parts = url_path.strip("/").split("/")
if not parts:
raise self.server.error("Invalid path: " % (url_path))
raise self.server.error(f"Invalid path: {url_path}")
base = parts[0]
if base not in self.file_paths:
raise self.server.error("Invalid base path (%s)" % (base))
raise self.server.error(f"Invalid base path ({base})")
root_path = local_path = self.file_paths[base]
url_path = ""
if len(parts) > 1:
@ -209,9 +209,9 @@ class FileManager:
dest_base, dst_url_path, dest_path = self._convert_path(destination)
if dest_base not in FULL_ACCESS_ROOTS:
raise self.server.error(
"Destination path is read-only: %s" % (dest_base))
f"Destination path is read-only: {dest_base}")
if not os.path.exists(source_path):
raise self.server.error("File %s does not exist" % (source_path))
raise self.server.error(f"File {source_path} does not exist")
# make sure the destination is not in use
if os.path.exists(dest_path):
await self._handle_operation_check(dest_path)
@ -219,8 +219,7 @@ class FileManager:
if path == "/server/files/move":
if source_base not in FULL_ACCESS_ROOTS:
raise self.server.error(
"Source path is read-only, cannot move: %s"
% (source_base))
f"Source path is read-only, cannot move: {source_base}")
# if moving the file, make sure the source is not in use
await self._handle_operation_check(source_path)
try:
@ -248,7 +247,7 @@ class FileManager:
def _list_directory(self, path):
if not os.path.isdir(path):
raise self.server.error(
"Directory does not exist (%s)" % (path))
f"Directory does not exist ({path})")
flist = {'dirs': [], 'files': []}
for fname in os.listdir(path):
full_path = os.path.join(path, fname)
@ -311,14 +310,14 @@ class FileManager:
# Use os.walk find files in sd path and subdirs
path = self.file_paths.get(base, None)
if path is None:
msg = "No known path for root: %s" % (base)
msg = f"No known path for root: {base}"
logging.info(msg)
raise self.server.error(msg)
elif not os.path.isdir(path):
msg = "Cannot generate file list for root: %s" % (base)
msg = f"Cannot generate file list for root: {base}"
logging.info(msg)
raise self.server.error(msg)
logging.info("Updating File List <%s>..." % (base))
logging.info(f"Updating File List <{base}>...")
new_list = {}
for root, dirs, files in os.walk(path, followlinks=True):
for name in files:
@ -342,7 +341,7 @@ class FileManager:
elif root in FULL_ACCESS_ROOTS:
result = self._do_standard_upload(request, root)
else:
raise self.server.error("Invalid root request: %s" % (root))
raise self.server.error(f"Invalid root request: {root}")
return result
async def _do_gcode_upload(self, request):
@ -383,7 +382,7 @@ class FileManager:
def _do_standard_upload(self, request, root):
path = self.file_paths.get(root, None)
if path is None:
raise self.server.error("Unknown root path: %s" % (root))
raise self.server.error(f"Unknown root path: {root}")
upload = self._get_upload_info(request, path)
self._write_file(upload)
self.notify_filelist_changed('upload_file', upload['filename'], root)
@ -419,7 +418,7 @@ class FileManager:
# Validate the path. Don't allow uploads to a parent of the root
if not full_path.startswith(base_path):
raise self.server.error(
"Cannot write to path: %s" % (full_path))
f"Cannot write to path: {full_path}")
return {
'filename': filename,
'body': upload['body'],
@ -472,7 +471,7 @@ class FileManager:
root = parts[0]
if root not in self.file_paths:
raise self.server.error(
"Invalid Directory Request: %s" % (directory))
f"Invalid Directory Request: {directory}")
path = self.file_paths[root]
if len(parts) == 1:
dir_path = path
@ -480,7 +479,7 @@ class FileManager:
dir_path = os.path.join(path, parts[1])
if not os.path.isdir(dir_path):
raise self.server.error(
"Directory does not exist (%s)" % (dir_path))
f"Directory does not exist ({dir_path})")
flist = self._list_directory(dir_path)
if simple_format:
simple_list = []
@ -498,11 +497,11 @@ class FileManager:
parts = path.split("/", 1)
root = parts[0]
if root not in self.file_paths or len(parts) != 2:
raise self.server.error("Invalid file path: %s" % (path))
raise self.server.error(f"Invalid file path: {path}")
root_path = self.file_paths[root]
full_path = os.path.join(root_path, parts[1])
if not os.path.isfile(full_path):
raise self.server.error("Invalid file path: %s" % (path))
raise self.server.error(f"Invalid file path: {path}")
os.remove(full_path)
def notify_filelist_changed(self, action, fname, base, source_item={}):

View File

@ -27,7 +27,7 @@ class Machine:
try:
await scmd.run(timeout=2., verbose=False)
except Exception:
logging.exception("Error running cmd '%s'" % (cmd))
logging.exception(f"Error running cmd '{cmd}'")
return "ok"
def load_plugin(config):

View File

@ -47,7 +47,7 @@ class SerialConnection:
if connect_time > start_time + 30.:
logging.info("Unable to connect, aborting")
break
logging.info("Attempting to connect to: %s" % (self.port))
logging.info(f"Attempting to connect to: {self.port}")
try:
# XXX - sometimes the port cannot be exclusively locked, this
# would likely be due to a restart where the serial port was
@ -55,7 +55,7 @@ class SerialConnection:
self.ser = serial.Serial(
self.port, self.baud, timeout=0, exclusive=True)
except (OSError, IOError, serial.SerialException):
logging.exception("Unable to open port: %s" % (self.port))
logging.exception(f"Unable to open port: {self.port}")
await gen.sleep(2.)
connect_time += time.time()
continue
@ -235,11 +235,10 @@ class PanelDue:
self.kinematics = printer_cfg.get('kinematics', "none")
logging.info(
"PanelDue Config Received:\n"
"Firmware Name: %s\n"
"Kinematics: %s\n"
"Printer Config: %s\n"
% (self.firmware_name, self.kinematics, str(config)))
f"PanelDue Config Received:\n"
f"Firmware Name: {self.firmware_name}\n"
f"Kinematics: {self.kinematics}\n"
f"Printer Config: {config}\n")
# Initalize printer state and make subscription request
self.printer_state = {
@ -305,7 +304,7 @@ class PanelDue:
# Invalid checksum, do not process
msg = "!! Invalid Checksum"
if line_no is not None:
msg = " Line Number: %d" % line_no
msg += f" Line Number: {line_no}"
logging.exception("PanelDue: " + msg)
raise PanelDueError(msg)
@ -317,7 +316,7 @@ class PanelDue:
if calculated_cs & 0xFF != checksum:
msg = "!! Invalid Checksum"
if line_no is not None:
msg = " Line Number: %d" % line_no
msg += f" Line Number: {line_no}"
logging.info("PanelDue: " + msg)
raise PanelDueError(msg)
@ -337,7 +336,7 @@ class PanelDue:
try:
val = int(p[1:].strip()) if arg in "sr" else p[1:].strip()
except Exception:
msg = "paneldue: Error parsing direct gcode %s" % (script)
msg = f"paneldue: Error parsing direct gcode {script}"
self.handle_gcode_response("!! " + msg)
logging.exception(msg)
return
@ -356,7 +355,7 @@ class PanelDue:
await self._klippy_request(
"gcode/script", method='POST', args=args)
except PanelDueError:
msg = "Error executing script %s" % script
msg = f"Error executing script {script}"
self.handle_gcode_response("!! " + msg)
logging.exception(msg)
@ -390,13 +389,13 @@ class PanelDue:
macro = macro[name_start:]
cmd = self.available_macros.get(macro)
if cmd is None:
raise PanelDueError("Macro %s invalid" % (macro))
raise PanelDueError(f"Macro {macro} invalid")
return cmd
def _prepare_M290(self, args):
# args should in in the format Z0.02
offset = args[0][1:].strip()
return "SET_GCODE_OFFSET Z_ADJUST=%s MOVE=1" % (offset)
return f"SET_GCODE_OFFSET Z_ADJUST={offset} MOVE=1"
def handle_gcode_response(self, response):
# Only queue up "non-trivial" gcode responses. At the
@ -567,8 +566,7 @@ class PanelDue:
response_type = arg_s
if response_type != 2:
logging.info(
"PanelDue: Cannot process response type %d in M20"
% (response_type))
f"Cannot process response type {response_type} in M20")
return
path = arg_p

View File

@ -85,14 +85,13 @@ class PrinterPower:
for name, device in devices.items():
try:
logging.debug(
"Attempting to configure pin GPIO%d"
% (device["pin"]))
f"Attempting to configure pin GPIO{device['pin']}")
await GPIO.setup_pin(device["pin"], device["active_low"])
device["status"] = GPIO.is_pin_on(device["pin"])
except Exception:
logging.exception(
"Power plugin: ERR Problem configuring the output pin for"
" device %s. Removing device" % (name))
f"Power plugin: ERR Problem configuring the output pin for"
f" device {name}. Removing device")
continue
self.devices[name] = device
@ -100,16 +99,16 @@ class GPIO:
gpio_root = "/sys/class/gpio"
@staticmethod
def _set_gpio_option(gpio, option, value):
def _set_gpio_option(pin, option, value):
GPIO._write(
os.path.join(GPIO.gpio_root, "gpio%d" % (gpio), option),
os.path.join(GPIO.gpio_root, f"gpio{pin}", option),
value
)
@staticmethod
def _get_gpio_option(pin, option):
return GPIO._read(
os.path.join(GPIO.gpio_root, "gpio%d" % (pin), option)
os.path.join(GPIO.gpio_root, f"gpio{pin}", option)
)
@staticmethod
@ -126,9 +125,9 @@ class GPIO:
@staticmethod
async def verify_pin(pin, active_low=1):
gpiopath = os.path.join(GPIO.gpio_root, "gpio%d" % (pin))
gpiopath = os.path.join(GPIO.gpio_root, f"gpio{pin}")
if not os.path.exists(gpiopath):
logging.info("Re-intializing GPIO%d" % (pin))
logging.info(f"Re-intializing GPIO{pin}")
await GPIO.setup_pin(pin, active_low)
return
@ -143,14 +142,14 @@ class GPIO:
pin = int(pin)
active_low = 1 if active_low == 1 else 0
gpiopath = os.path.join(GPIO.gpio_root, "gpio%d" % (pin))
gpiopath = os.path.join(GPIO.gpio_root, f"gpio{pin}")
if not os.path.exists(gpiopath):
GPIO._write(
os.path.join(GPIO.gpio_root, "export"),
pin)
logging.info("Waiting for GPIO%d to initialize" % (pin))
logging.info(f"Waiting for GPIO{pin} to initialize")
while os.stat(os.path.join(
GPIO.gpio_root, "gpio%d" % (pin),
GPIO.gpio_root, f"gpio{pin}",
"active_low")).st_gid == 0:
await gen.sleep(.1)

View File

@ -50,7 +50,7 @@ class ShellCommand:
self.command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
except Exception:
logging.exception(
"shell_command: Command {%s} failed" % (self.name))
f"shell_command: Command ({self.name}) failed")
return
if verbose:
fd = proc.stdout.fileno()
@ -74,9 +74,9 @@ class ShellCommand:
self.output_cb(self.partial_output)
self.partial_output = b""
if complete:
msg = "Command {%s} finished" % (self.name)
msg = f"Command ({self.name}) finished"
else:
msg = "Command {%s} timed out" % (self.name)
msg = f"Command ({self.name}) timed out"
logging.info(msg)
self.io_loop.remove_handler(fd)

View File

@ -40,7 +40,7 @@ class TemperatureStore:
"objects/status", 'GET', {'heaters': []})
result = await request.wait()
if isinstance(result, self.server.error):
logging.info("Error Configuring Sensors: %s" % (str(result)))
logging.info(f"Error Configuring Sensors: {result}")
return
sensors = result.get("heaters", {}).get("available_sensors", [])
@ -51,9 +51,9 @@ class TemperatureStore:
"objects/subscription", 'POST', sub)
result = await request.wait()
if isinstance(result, self.server.error):
logging.info("Error subscribing to sensors: %s" % (str(result)))
logging.info(f"Error subscribing to sensors: {result}")
return
logging.info("Configuring available sensors: %s" % (str(sensors)))
logging.info(f"Configuring available sensors: {sensors}")
new_store = {}
for sensor in sensors:
if sensor in self.temperature_store:

View File

@ -26,7 +26,7 @@ class JsonRPC:
try:
request = json.loads(data)
except Exception:
msg = "Websocket data not json: %s" % (str(data))
msg = f"Websocket data not json: {data}"
logging.exception(msg)
response = self.build_error(-32700, "Parse error")
return json.dumps(response)
@ -164,13 +164,13 @@ class WebsocketManager:
async def add_websocket(self, ws):
async with self.ws_lock:
self.websockets[ws.uid] = ws
logging.info("New Websocket Added: %d" % ws.uid)
logging.info(f"New Websocket Added: {ws.uid}")
async def remove_websocket(self, ws):
async with self.ws_lock:
old_ws = self.websockets.pop(ws.uid, None)
if old_ws is not None:
logging.info("Websocket Removed: %d" % ws.uid)
logging.info(f"Websocket Removed: {ws.uid}")
async def notify_websockets(self, name, data):
notification = json.dumps({
@ -183,10 +183,10 @@ class WebsocketManager:
ws.write_message(notification)
except WebSocketClosedError:
self.websockets.pop(ws.uid, None)
logging.info("Websocket Removed: %d" % ws.uid)
logging.info(f"Websocket Removed: {ws.uid}")
except Exception:
logging.exception(
"Error sending data over websocket: %d" % (ws.uid))
f"Error sending data over websocket: {ws.uid}")
async def close(self):
async with self.ws_lock: