moonraker: add annotations
Annotate function definitions, class attributes, and local variables as necessary for type hinting. This is useful for IDEs with linters that support type hints and also can be used by GitHub Actions to detect erroneous code. Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
parent
9c76dbef7a
commit
ca69a10838
|
@ -4,6 +4,8 @@
|
|||
# Copyright (C) 2020 Eric Callahan <arksine.code@gmail.com>
|
||||
#
|
||||
# This file may be distributed under the terms of the GNU GPLv3 license
|
||||
|
||||
from __future__ import annotations
|
||||
import argparse
|
||||
import sys
|
||||
import importlib
|
||||
|
@ -22,7 +24,28 @@ from tornado.ioloop import IOLoop
|
|||
from tornado.util import TimeoutError
|
||||
from tornado.locks import Event
|
||||
from app import MoonrakerApp
|
||||
from utils import ServerError
|
||||
from utils import ServerError, SentinelClass
|
||||
|
||||
# Annotation imports
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Optional,
|
||||
Callable,
|
||||
Coroutine,
|
||||
Tuple,
|
||||
Dict,
|
||||
List,
|
||||
Union,
|
||||
TypeVar,
|
||||
)
|
||||
if TYPE_CHECKING:
|
||||
from websockets import WebRequest, WebSocket, Subscribable
|
||||
from components.data_store import DataStore
|
||||
from components.klippy_apis import KlippyAPI
|
||||
from components.file_manager import FileManager
|
||||
FlexCallback = Callable[..., Optional[Coroutine]]
|
||||
_T = TypeVar("_T")
|
||||
|
||||
INIT_TIME = .25
|
||||
LOG_ATTEMPT_INTERVAL = int(2. / INIT_TIME + .5)
|
||||
|
@ -32,12 +55,14 @@ CORE_COMPONENTS = [
|
|||
'database', 'file_manager', 'klippy_apis', 'machine',
|
||||
'data_store', 'shell_command', 'proc_stats']
|
||||
|
||||
class Sentinel:
|
||||
pass
|
||||
SENTINEL = SentinelClass.get_instance()
|
||||
|
||||
class Server:
|
||||
error = ServerError
|
||||
def __init__(self, args, file_logger):
|
||||
def __init__(self,
|
||||
args: argparse.Namespace,
|
||||
file_logger: Optional[utils.MoonrakerLoggingHandler]
|
||||
) -> None:
|
||||
self.file_logger = file_logger
|
||||
self.config = config = confighelper.get_configuration(self, args)
|
||||
# log config file
|
||||
|
@ -48,29 +73,29 @@ class Server:
|
|||
cfg_item += "#"*65
|
||||
strio.close()
|
||||
self.add_log_rollover_item('config', cfg_item)
|
||||
self.host = config.get('host', "0.0.0.0")
|
||||
self.port = config.getint('port', 7125)
|
||||
self.exit_reason = ""
|
||||
self.host: str = config.get('host', "0.0.0.0")
|
||||
self.port: int = config.getint('port', 7125)
|
||||
self.exit_reason: str = ""
|
||||
|
||||
# Event initialization
|
||||
self.events = {}
|
||||
self.events: Dict[str, List[FlexCallback]] = {}
|
||||
|
||||
# Klippy Connection Handling
|
||||
self.klippy_address = config.get(
|
||||
self.klippy_address: str = config.get(
|
||||
'klippy_uds_address', "/tmp/klippy_uds")
|
||||
self.klippy_connection = KlippyConnection(
|
||||
self.process_command, self.on_connection_closed)
|
||||
self.klippy_info = {}
|
||||
self.init_list = []
|
||||
self.init_handle = None
|
||||
self.init_attempts = 0
|
||||
self.klippy_state = "disconnected"
|
||||
self.klippy_disconnect_evt = None
|
||||
self.subscriptions = {}
|
||||
self.failed_components = []
|
||||
self.klippy_info: Dict[str, Any] = {}
|
||||
self.init_list: List[str] = []
|
||||
self.init_handle: Optional[object] = None
|
||||
self.init_attempts: int = 0
|
||||
self.klippy_state: str = "disconnected"
|
||||
self.klippy_disconnect_evt: Optional[Event] = None
|
||||
self.subscriptions: Dict[Subscribable, Dict[str, Any]] = {}
|
||||
self.failed_components: List[str] = []
|
||||
|
||||
# Server/IOLoop
|
||||
self.server_running = False
|
||||
self.server_running: bool = False
|
||||
self.moonraker_app = app = MoonrakerApp(config)
|
||||
self.register_endpoint = app.register_local_handler
|
||||
self.register_static_file_handler = app.register_static_file_handler
|
||||
|
@ -93,9 +118,9 @@ class Server:
|
|||
# Setup remote methods accessable to Klippy. Note that all
|
||||
# registered remote methods should be of the notification type,
|
||||
# they do not return a response to Klippy after execution
|
||||
self.pending_requests = {}
|
||||
self.remote_methods = {}
|
||||
self.klippy_reg_methods = []
|
||||
self.pending_requests: Dict[int, BaseRequest] = {}
|
||||
self.remote_methods: Dict[str, FlexCallback] = {}
|
||||
self.klippy_reg_methods: List[str] = []
|
||||
self.register_remote_method(
|
||||
'process_gcode_response', self._process_gcode_response,
|
||||
need_klippy_reg=False)
|
||||
|
@ -104,12 +129,12 @@ class Server:
|
|||
need_klippy_reg=False)
|
||||
|
||||
# Component initialization
|
||||
self.components = {}
|
||||
self.components: Dict[str, Any] = {}
|
||||
self._load_components(config)
|
||||
self.klippy_apis = self.lookup_component('klippy_apis')
|
||||
self.klippy_apis: KlippyAPI = self.lookup_component('klippy_apis')
|
||||
config.validate_config()
|
||||
|
||||
def start(self):
|
||||
def start(self) -> None:
|
||||
hostname, hostport = self.get_host_info()
|
||||
logging.info(
|
||||
f"Starting Moonraker on ({self.host}, {hostport}), "
|
||||
|
@ -119,19 +144,20 @@ class Server:
|
|||
self.ioloop.spawn_callback(self._init_signals)
|
||||
self.ioloop.spawn_callback(self._connect_klippy)
|
||||
|
||||
def _init_signals(self):
|
||||
def _init_signals(self) -> None:
|
||||
aioloop = asyncio.get_event_loop()
|
||||
aioloop.add_signal_handler(
|
||||
signal.SIGTERM, self._handle_term_signal)
|
||||
|
||||
def add_log_rollover_item(self, name, item, log=True):
|
||||
def add_log_rollover_item(self, name: str, item: str,
|
||||
log: bool = True) -> None:
|
||||
if self.file_logger is not None:
|
||||
self.file_logger.set_rollover_info(name, item)
|
||||
if log and item is not None:
|
||||
logging.info(item)
|
||||
|
||||
# ***** Component Management *****
|
||||
def _load_components(self, config):
|
||||
def _load_components(self, config: confighelper.ConfigHelper) -> None:
|
||||
# load core components
|
||||
for component in CORE_COMPONENTS:
|
||||
self.load_component(config, component)
|
||||
|
@ -142,7 +168,11 @@ class Server:
|
|||
for section in opt_sections:
|
||||
self.load_component(config, section, None)
|
||||
|
||||
def load_component(self, config, component_name, default=Sentinel):
|
||||
def load_component(self,
|
||||
config: confighelper.ConfigHelper,
|
||||
component_name: str,
|
||||
default: Union[SentinelClass, _T] = SENTINEL
|
||||
) -> Union[_T, Any]:
|
||||
if component_name in self.components:
|
||||
return self.components[component_name]
|
||||
# Make sure component exists
|
||||
|
@ -152,7 +182,7 @@ class Server:
|
|||
msg = f"Component ({component_name}) does not exist"
|
||||
logging.info(msg)
|
||||
self.failed_components.append(component_name)
|
||||
if default == Sentinel:
|
||||
if isinstance(default, SentinelClass):
|
||||
raise ServerError(msg)
|
||||
return default
|
||||
try:
|
||||
|
@ -169,32 +199,45 @@ class Server:
|
|||
msg = f"Unable to load component: ({component_name})"
|
||||
logging.exception(msg)
|
||||
self.failed_components.append(component_name)
|
||||
if default == Sentinel:
|
||||
if isinstance(default, SentinelClass):
|
||||
raise ServerError(msg)
|
||||
return default
|
||||
self.components[component_name] = component
|
||||
logging.info(f"Component ({component_name}) loaded")
|
||||
return component
|
||||
|
||||
def lookup_component(self, component_name, default=Sentinel):
|
||||
def lookup_component(self,
|
||||
component_name: str,
|
||||
default: Union[SentinelClass, _T] = SENTINEL
|
||||
) -> Union[_T, Any]:
|
||||
component = self.components.get(component_name, default)
|
||||
if component == Sentinel:
|
||||
if isinstance(component, SentinelClass):
|
||||
raise ServerError(f"Component ({component_name}) not found")
|
||||
return component
|
||||
|
||||
def register_notification(self, event_name, notify_name=None):
|
||||
def register_notification(self,
|
||||
event_name: str,
|
||||
notify_name: Optional[str] = None
|
||||
) -> None:
|
||||
wsm = self.moonraker_app.get_websocket_manager()
|
||||
wsm.register_notification(event_name, notify_name)
|
||||
|
||||
def register_event_handler(self, event, callback):
|
||||
def register_event_handler(self,
|
||||
event: str,
|
||||
callback: FlexCallback
|
||||
) -> None:
|
||||
self.events.setdefault(event, []).append(callback)
|
||||
|
||||
def send_event(self, event, *args):
|
||||
def send_event(self, event: str, *args) -> None:
|
||||
events = self.events.get(event, [])
|
||||
for evt in events:
|
||||
self.ioloop.spawn_callback(evt, *args)
|
||||
|
||||
def register_remote_method(self, method_name, cb, need_klippy_reg=True):
|
||||
def register_remote_method(self,
|
||||
method_name: str,
|
||||
cb: FlexCallback,
|
||||
need_klippy_reg: bool = True
|
||||
) -> None:
|
||||
if method_name in self.remote_methods:
|
||||
# XXX - may want to raise an exception here
|
||||
logging.info(f"Remote method ({method_name}) already registered")
|
||||
|
@ -204,28 +247,28 @@ class Server:
|
|||
# These methods need to be registered with Klippy
|
||||
self.klippy_reg_methods.append(method_name)
|
||||
|
||||
def get_host_info(self):
|
||||
def get_host_info(self) -> Tuple[str, int]:
|
||||
hostname = socket.gethostname()
|
||||
return hostname, self.port
|
||||
|
||||
def get_klippy_info(self):
|
||||
def get_klippy_info(self) -> Dict[str, Any]:
|
||||
return dict(self.klippy_info)
|
||||
|
||||
def get_klippy_state(self):
|
||||
def get_klippy_state(self) -> str:
|
||||
return self.klippy_state
|
||||
|
||||
# ***** Klippy Connection *****
|
||||
async def _connect_klippy(self):
|
||||
async def _connect_klippy(self) -> None:
|
||||
if not self.server_running:
|
||||
return
|
||||
ret = await self.klippy_connection.connect(self.klippy_address)
|
||||
if not ret:
|
||||
self.ioloop.call_later(.25, self._connect_klippy)
|
||||
self.ioloop.call_later(.25, self._connect_klippy) # type: ignore
|
||||
return
|
||||
# begin server iniialization
|
||||
self.ioloop.spawn_callback(self._initialize)
|
||||
|
||||
def process_command(self, cmd):
|
||||
def process_command(self, cmd: Dict[str, Any]) -> None:
|
||||
method = cmd.get('method', None)
|
||||
if method is not None:
|
||||
# This is a remote method called from klippy
|
||||
|
@ -253,15 +296,15 @@ class Server:
|
|||
result = ServerError(err, 400)
|
||||
request.notify(result)
|
||||
|
||||
async def _execute_method(self, method_name, **kwargs):
|
||||
async def _execute_method(self, method_name: str, **kwargs) -> None:
|
||||
try:
|
||||
ret = self.remote_methods[method_name](**kwargs)
|
||||
if asyncio.iscoroutine(ret):
|
||||
if ret is not None:
|
||||
await ret
|
||||
except Exception:
|
||||
logging.exception(f"Error running remote method: {method_name}")
|
||||
|
||||
def on_connection_closed(self):
|
||||
def on_connection_closed(self) -> None:
|
||||
self.init_list = []
|
||||
self.klippy_state = "disconnected"
|
||||
for request in self.pending_requests.values():
|
||||
|
@ -273,11 +316,11 @@ class Server:
|
|||
if self.init_handle is not None:
|
||||
self.ioloop.remove_timeout(self.init_handle)
|
||||
if self.server_running:
|
||||
self.ioloop.call_later(.25, self._connect_klippy)
|
||||
self.ioloop.call_later(.25, self._connect_klippy) # type: ignore
|
||||
if self.klippy_disconnect_evt is not None:
|
||||
self.klippy_disconnect_evt.set()
|
||||
|
||||
async def _initialize(self):
|
||||
async def _initialize(self) -> None:
|
||||
if not self.server_running:
|
||||
return
|
||||
await self._check_ready()
|
||||
|
@ -311,18 +354,19 @@ class Server:
|
|||
else:
|
||||
self.init_attempts += 1
|
||||
self.init_handle = self.ioloop.call_later(
|
||||
INIT_TIME, self._initialize)
|
||||
INIT_TIME, self._initialize) # type: ignore
|
||||
|
||||
async def _request_endpoints(self):
|
||||
async def _request_endpoints(self) -> None:
|
||||
result = await self.klippy_apis.list_endpoints(default=None)
|
||||
if result is None:
|
||||
return
|
||||
endpoints = result.get('endpoints', {})
|
||||
endpoints = result.get('endpoints', [])
|
||||
for ep in endpoints:
|
||||
self.moonraker_app.register_remote_handler(ep)
|
||||
|
||||
async def _check_ready(self):
|
||||
async def _check_ready(self) -> None:
|
||||
send_id = "identified" not in self.init_list
|
||||
result: Dict[str, Any]
|
||||
try:
|
||||
result = await self.klippy_apis.get_klippy_info(send_id)
|
||||
except ServerError as e:
|
||||
|
@ -354,7 +398,7 @@ class Server:
|
|||
msg = result.get('state_message', "Klippy Not Ready")
|
||||
logging.info("\n" + msg)
|
||||
|
||||
async def _verify_klippy_requirements(self):
|
||||
async def _verify_klippy_requirements(self) -> None:
|
||||
result = await self.klippy_apis.get_object_list(default=None)
|
||||
if result is None:
|
||||
logging.info(
|
||||
|
@ -370,39 +414,43 @@ class Server:
|
|||
f"to printer.cfg for full Moonraker functionality.")
|
||||
if "virtual_sdcard" not in missing_objs:
|
||||
# Update the gcode path
|
||||
result = await self.klippy_apis.query_objects(
|
||||
query_res = await self.klippy_apis.query_objects(
|
||||
{'configfile': None}, default=None)
|
||||
if result is None:
|
||||
if query_res is None:
|
||||
logging.info(f"Unable to set SD Card path")
|
||||
else:
|
||||
config = result.get('configfile', {}).get('config', {})
|
||||
config = query_res.get('configfile', {}).get('config', {})
|
||||
vsd_config = config.get('virtual_sdcard', {})
|
||||
vsd_path = vsd_config.get('path', None)
|
||||
if vsd_path is not None:
|
||||
file_manager = self.lookup_component('file_manager')
|
||||
file_manager: FileManager = self.lookup_component(
|
||||
'file_manager')
|
||||
file_manager.register_directory('gcodes', vsd_path)
|
||||
else:
|
||||
logging.info(
|
||||
"Configuration for [virtual_sdcard] not found,"
|
||||
" unable to set SD Card path")
|
||||
|
||||
def _process_gcode_response(self, response):
|
||||
def _process_gcode_response(self, response: str) -> None:
|
||||
self.send_event("server:gcode_response", response)
|
||||
|
||||
def _process_status_update(self, eventtime, status):
|
||||
def _process_status_update(self,
|
||||
eventtime: float,
|
||||
status: Dict[str, Any]
|
||||
) -> None:
|
||||
if 'webhooks' in status:
|
||||
# XXX - process other states (startup, ready, error, etc)?
|
||||
state = status['webhooks'].get('state', None)
|
||||
state: Optional[str] = status['webhooks'].get('state', None)
|
||||
if state is not None:
|
||||
if state == "shutdown":
|
||||
logging.info("Klippy has shutdown")
|
||||
self.send_event("server:klippy_shutdown")
|
||||
self.klippy_state = state
|
||||
for conn, sub in self.subscriptions.items():
|
||||
conn_status = {}
|
||||
conn_status: Dict[str, Any] = {}
|
||||
for name, fields in sub.items():
|
||||
if name in status:
|
||||
val = status[name]
|
||||
val: Dict[str, Any] = status[name]
|
||||
if fields is None:
|
||||
conn_status[name] = dict(val)
|
||||
else:
|
||||
|
@ -410,18 +458,20 @@ class Server:
|
|||
k: v for k, v in val.items() if k in fields}
|
||||
conn.send_status(conn_status)
|
||||
|
||||
async def make_request(self, web_request):
|
||||
async def make_request(self, web_request: WebRequest) -> Any:
|
||||
rpc_method = web_request.get_endpoint()
|
||||
if rpc_method == "objects/subscribe":
|
||||
return await self._request_subscripton(web_request)
|
||||
else:
|
||||
if rpc_method == "gcode/script":
|
||||
script = web_request.get_str('script', "")
|
||||
data_store = self.lookup_component('data_store')
|
||||
data_store: DataStore = self.lookup_component('data_store')
|
||||
data_store.store_gcode_command(script)
|
||||
return await self._request_standard(web_request)
|
||||
|
||||
async def _request_subscripton(self, web_request):
|
||||
async def _request_subscripton(self,
|
||||
web_request: WebRequest
|
||||
) -> Dict[str, Any]:
|
||||
args = web_request.get_args()
|
||||
conn = web_request.get_connection()
|
||||
|
||||
|
@ -432,7 +482,7 @@ class Server:
|
|||
raise self.error(
|
||||
"No connection associated with subscription request")
|
||||
self.subscriptions[conn] = sub
|
||||
all_subs = {}
|
||||
all_subs: Dict[str, Any] = {}
|
||||
# request superset of all client subscriptions
|
||||
for sub in self.subscriptions.values():
|
||||
for obj, items in sub.items():
|
||||
|
@ -465,7 +515,7 @@ class Server:
|
|||
result['status'] = pruned_status
|
||||
return result
|
||||
|
||||
async def _request_standard(self, web_request):
|
||||
async def _request_standard(self, web_request: WebRequest) -> Any:
|
||||
rpc_method = web_request.get_endpoint()
|
||||
args = web_request.get_args()
|
||||
# Create a base klippy request
|
||||
|
@ -475,23 +525,23 @@ class Server:
|
|||
self.klippy_connection.send_request, base_request)
|
||||
return await base_request.wait()
|
||||
|
||||
def remove_subscription(self, conn):
|
||||
def remove_subscription(self, conn: Subscribable) -> None:
|
||||
self.subscriptions.pop(conn, None)
|
||||
|
||||
def _handle_term_signal(self):
|
||||
def _handle_term_signal(self) -> None:
|
||||
logging.info(f"Exiting with signal SIGTERM")
|
||||
self.ioloop.spawn_callback(self._stop_server, "terminate")
|
||||
|
||||
async def _stop_server(self, exit_reason="restart"):
|
||||
async def _stop_server(self, exit_reason: str = "restart") -> None:
|
||||
self.server_running = False
|
||||
for method in ["on_exit", "close"]:
|
||||
for name, component in self.components.items():
|
||||
if not hasattr(component, method):
|
||||
continue
|
||||
func = getattr(component, method)
|
||||
func: FlexCallback = getattr(component, method)
|
||||
try:
|
||||
ret = func()
|
||||
if asyncio.iscoroutine(ret):
|
||||
if ret is not None:
|
||||
await ret
|
||||
except Exception:
|
||||
logging.exception(
|
||||
|
@ -517,12 +567,15 @@ class Server:
|
|||
aioloop.remove_signal_handler(signal.SIGTERM)
|
||||
self.ioloop.stop()
|
||||
|
||||
async def _handle_server_restart(self, web_request):
|
||||
async def _handle_server_restart(self, web_request: WebRequest) -> str:
|
||||
self.ioloop.spawn_callback(self._stop_server)
|
||||
return "ok"
|
||||
|
||||
async def _handle_info_request(self, web_request):
|
||||
file_manager = self.lookup_component('file_manager', None)
|
||||
async def _handle_info_request(self,
|
||||
web_request: WebRequest
|
||||
) -> Dict[str, Any]:
|
||||
file_manager: Optional[FileManager] = self.lookup_component(
|
||||
'file_manager', None)
|
||||
reg_dirs = []
|
||||
if file_manager is not None:
|
||||
reg_dirs = file_manager.get_registered_dirs()
|
||||
|
@ -535,19 +588,24 @@ class Server:
|
|||
'failed_plugins': self.failed_components,
|
||||
'registered_directories': reg_dirs}
|
||||
|
||||
async def _handle_config_request(self, web_request):
|
||||
async def _handle_config_request(self,
|
||||
web_request: WebRequest
|
||||
) -> Dict[str, Any]:
|
||||
return {
|
||||
'config': self.config.get_parsed_config()
|
||||
}
|
||||
|
||||
class KlippyConnection:
|
||||
def __init__(self, on_recd, on_close):
|
||||
def __init__(self,
|
||||
on_recd: Callable[[dict], None],
|
||||
on_close: Callable[[], None]
|
||||
) -> None:
|
||||
self.ioloop = IOLoop.current()
|
||||
self.iostream = None
|
||||
self.iostream: Optional[iostream.IOStream] = None
|
||||
self.on_recd = on_recd
|
||||
self.on_close = on_close
|
||||
|
||||
async def connect(self, address):
|
||||
async def connect(self, address: str) -> bool:
|
||||
ksock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
kstream = iostream.IOStream(ksock)
|
||||
try:
|
||||
|
@ -560,7 +618,7 @@ class KlippyConnection:
|
|||
self.ioloop.spawn_callback(self._read_stream, self.iostream)
|
||||
return True
|
||||
|
||||
async def _read_stream(self, stream):
|
||||
async def _read_stream(self, stream: iostream.IOStream) -> None:
|
||||
while not stream.closed():
|
||||
try:
|
||||
data = await stream.read_until(b'\x03')
|
||||
|
@ -576,7 +634,7 @@ class KlippyConnection:
|
|||
logging.exception(
|
||||
f"Error processing Klippy Host Response: {data.decode()}")
|
||||
|
||||
async def send_request(self, request):
|
||||
async def send_request(self, request: BaseRequest) -> None:
|
||||
if self.iostream is None:
|
||||
request.notify(ServerError("Klippy Host not connected", 503))
|
||||
return
|
||||
|
@ -586,24 +644,24 @@ class KlippyConnection:
|
|||
except iostream.StreamClosedError:
|
||||
request.notify(ServerError("Klippy Host not connected", 503))
|
||||
|
||||
def is_connected(self):
|
||||
def is_connected(self) -> bool:
|
||||
return self.iostream is not None and not self.iostream.closed()
|
||||
|
||||
def close(self):
|
||||
def close(self) -> None:
|
||||
if self.iostream is not None and \
|
||||
not self.iostream.closed():
|
||||
self.iostream.close()
|
||||
|
||||
# Basic WebRequest class, easily converted to dict for json encoding
|
||||
class BaseRequest:
|
||||
def __init__(self, rpc_method, params):
|
||||
def __init__(self, rpc_method: str, params: Dict[str, Any]) -> None:
|
||||
self.id = id(self)
|
||||
self.rpc_method = rpc_method
|
||||
self.params = params
|
||||
self._event = Event()
|
||||
self.response = None
|
||||
self.response: Any = None
|
||||
|
||||
async def wait(self):
|
||||
async def wait(self) -> Any:
|
||||
# Log pending requests every 60 seconds
|
||||
start_time = time.time()
|
||||
while True:
|
||||
|
@ -622,15 +680,15 @@ class BaseRequest:
|
|||
raise self.response
|
||||
return self.response
|
||||
|
||||
def notify(self, response):
|
||||
def notify(self, response: Any) -> None:
|
||||
self.response = response
|
||||
self._event.set()
|
||||
|
||||
def to_dict(self):
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {'id': self.id, 'method': self.rpc_method,
|
||||
'params': self.params}
|
||||
|
||||
def main():
|
||||
def main() -> None:
|
||||
# Parse start arguments
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Moonraker - Klipper API Server")
|
||||
|
|
Loading…
Reference in New Issue