moonraker: refactor klipper initialization

The `send_event()` method now returns a future that can be awaited until all callbacks are complete.  All events emitted during Klipper init are now awaited, and a Lock is used to prevent re-entry.  This resolves potential timing issues with commands sent during the init sequence.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2021-11-03 07:52:56 -04:00 committed by Eric Callahan
parent c2bf9bf551
commit 76731b673b
2 changed files with 75 additions and 48 deletions

View File

@ -108,9 +108,11 @@ class KlippyAPI(Subscribable):
# Escape existing double quotes in the file name # Escape existing double quotes in the file name
filename = filename.replace("\"", "\\\"") filename = filename.replace("\"", "\\\"")
script = f'SDCARD_PRINT_FILE FILENAME="{filename}"' script = f'SDCARD_PRINT_FILE FILENAME="{filename}"'
await self.server.wait_connection_initialized()
return await self.run_gcode(script) return await self.run_gcode(script)
async def do_restart(self, gc: str) -> str: async def do_restart(self, gc: str) -> str:
await self.server.wait_connection_initialized()
try: try:
result = await self.run_gcode(gc) result = await self.run_gcode(gc)
except self.server.error as e: except self.server.error as e:

View File

@ -102,6 +102,7 @@ class Server:
self.init_attempts: int = 0 self.init_attempts: int = 0
self.klippy_state: str = "disconnected" self.klippy_state: str = "disconnected"
self.klippy_disconnect_evt: Optional[asyncio.Event] = None self.klippy_disconnect_evt: Optional[asyncio.Event] = None
self.connection_init_lock: asyncio.Lock = asyncio.Lock()
self.subscriptions: Dict[Subscribable, Dict[str, Any]] = {} self.subscriptions: Dict[Subscribable, Dict[str, Any]] = {}
self.failed_components: List[str] = [] self.failed_components: List[str] = []
self.warnings: List[str] = [] self.warnings: List[str] = []
@ -181,6 +182,10 @@ class Server:
self.server_running = True self.server_running = True
await self._connect_klippy() await self._connect_klippy()
async def wait_connection_initialized(self) -> None:
async with self.connection_init_lock:
return
def add_log_rollover_item(self, name: str, item: str, def add_log_rollover_item(self, name: str, item: str,
log: bool = True) -> None: log: bool = True) -> None:
if self.file_logger is not None: if self.file_logger is not None:
@ -266,10 +271,26 @@ class Server:
) -> None: ) -> None:
self.events.setdefault(event, []).append(callback) self.events.setdefault(event, []).append(callback)
def send_event(self, event: str, *args) -> None: def send_event(self, event: str, *args) -> asyncio.Future:
fut = self.event_loop.create_future()
self.event_loop.register_callback(
self._process_event, fut, event, *args)
return fut
async def _process_event(self,
fut: asyncio.Future,
event: str,
*args
) -> None:
events = self.events.get(event, []) events = self.events.get(event, [])
for evt in events: coroutines: List[Coroutine] = []
self.event_loop.register_callback(evt, *args) for func in events:
ret = func(*args)
if ret is not None:
coroutines.append(ret)
if coroutines:
await asyncio.gather(*coroutines)
fut.set_result(None)
def register_remote_method(self, def register_remote_method(self,
method_name: str, method_name: str,
@ -363,40 +384,43 @@ class Server:
async def _init_klippy_connection(self) -> None: async def _init_klippy_connection(self) -> None:
if not self.server_running: if not self.server_running:
return return
await self._check_ready() async with self.connection_init_lock:
await self._request_endpoints() await self._check_ready()
# Subscribe to "webhooks" await self._request_endpoints()
# Register "webhooks" subscription # Subscribe to "webhooks"
if "webhooks_sub" not in self.init_list: # Register "webhooks" subscription
try: if "webhooks_sub" not in self.init_list:
await self.klippy_apis.subscribe_objects({'webhooks': None}) try:
except ServerError as e: await self.klippy_apis.subscribe_objects(
logging.info(f"{e}\nUnable to subscribe to webhooks object") {'webhooks': None})
except ServerError as e:
logging.info(
f"{e}\nUnable to subscribe to webhooks object")
else:
logging.info("Webhooks Subscribed")
self.init_list.append("webhooks_sub")
# Subscribe to Gcode Output
if "gcode_output_sub" not in self.init_list:
try:
await self.klippy_apis.subscribe_gcode_output()
except ServerError as e:
logging.info(
f"{e}\nUnable to register gcode output subscription")
else:
logging.info("GCode Output Subscribed")
self.init_list.append("gcode_output_sub")
if (
"startup_complete" in self.init_list or
not self.klippy_connection.is_connected()
):
# Either Klippy is ready or the connection dropped
# during initialization. Exit initialization
self.init_attempts = 0
self.init_handle = None
else: else:
logging.info("Webhooks Subscribed") self.init_attempts += 1
self.init_list.append("webhooks_sub") self.init_handle = self.event_loop.delay_callback(
# Subscribe to Gcode Output INIT_TIME, self._init_klippy_connection)
if "gcode_output_sub" not in self.init_list:
try:
await self.klippy_apis.subscribe_gcode_output()
except ServerError as e:
logging.info(
f"{e}\nUnable to register gcode output subscription")
else:
logging.info("GCode Output Subscribed")
self.init_list.append("gcode_output_sub")
if (
"startup_complete" in self.init_list or
not self.klippy_connection.is_connected()
):
# Either Klippy is ready or the connection dropped
# during initialization. Exit initialization
self.init_attempts = 0
self.init_handle = None
else:
self.init_attempts += 1
self.init_handle = self.event_loop.delay_callback(
INIT_TIME, self._init_klippy_connection)
async def _request_endpoints(self) -> None: async def _request_endpoints(self) -> None:
result = await self.klippy_apis.list_endpoints(default=None) result = await self.klippy_apis.list_endpoints(default=None)
@ -423,23 +447,24 @@ class Server:
self.klippy_state = result.get('state', "unknown") self.klippy_state = result.get('state', "unknown")
if send_id: if send_id:
self.init_list.append("identified") self.init_list.append("identified")
self.send_event("server:klippy_identified") await self.send_event("server:klippy_identified")
if self.klippy_state != "startup": if self.klippy_state != "startup":
self.init_list.append('startup_complete') self.init_list.append('startup_complete')
self.send_event("server:klippy_started", self.klippy_state) await self.send_event("server:klippy_started", self.klippy_state)
if self.klippy_state != "ready": if self.klippy_state != "ready":
msg = result.get('state_message', "Klippy Not Ready") msg = result.get('state_message', "Klippy Not Ready")
logging.info("\n" + msg) logging.info("\n" + msg)
return else:
await self._verify_klippy_requirements() await self._verify_klippy_requirements()
logging.info("Klippy ready") # register methods with klippy
# register methods with klippy for method in self.klippy_reg_methods:
for method in self.klippy_reg_methods: try:
try: await self.klippy_apis.register_method(method)
await self.klippy_apis.register_method(method) except ServerError:
except ServerError: logging.exception(
logging.exception(f"Unable to register method '{method}'") f"Unable to register method '{method}'")
self.send_event("server:klippy_ready") logging.info("Klippy ready")
await self.send_event("server:klippy_ready")
async def _verify_klippy_requirements(self) -> None: async def _verify_klippy_requirements(self) -> None:
result = await self.klippy_apis.get_object_list(default=None) result = await self.klippy_apis.get_object_list(default=None)