From 76731b673bccf9721cb8047620282017ed4fc06c Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Wed, 3 Nov 2021 07:52:56 -0400 Subject: [PATCH] moonraker: refactor klipper initialization The `send_event()` method now returns a future that can be awaited until all callbacks are complete. All events emitted during Klipper init are now awaited, and a Lock is used to prevent re-entry. This resolves potential timing issues with commands sent during the init sequence. Signed-off-by: Eric Callahan --- moonraker/components/klippy_apis.py | 2 + moonraker/moonraker.py | 121 +++++++++++++++++----------- 2 files changed, 75 insertions(+), 48 deletions(-) diff --git a/moonraker/components/klippy_apis.py b/moonraker/components/klippy_apis.py index f8248ee..a56e7a8 100644 --- a/moonraker/components/klippy_apis.py +++ b/moonraker/components/klippy_apis.py @@ -108,9 +108,11 @@ class KlippyAPI(Subscribable): # Escape existing double quotes in the file name filename = filename.replace("\"", "\\\"") script = f'SDCARD_PRINT_FILE FILENAME="{filename}"' + await self.server.wait_connection_initialized() return await self.run_gcode(script) async def do_restart(self, gc: str) -> str: + await self.server.wait_connection_initialized() try: result = await self.run_gcode(gc) except self.server.error as e: diff --git a/moonraker/moonraker.py b/moonraker/moonraker.py index eca22e7..4c09fa9 100755 --- a/moonraker/moonraker.py +++ b/moonraker/moonraker.py @@ -102,6 +102,7 @@ class Server: self.init_attempts: int = 0 self.klippy_state: str = "disconnected" self.klippy_disconnect_evt: Optional[asyncio.Event] = None + self.connection_init_lock: asyncio.Lock = asyncio.Lock() self.subscriptions: Dict[Subscribable, Dict[str, Any]] = {} self.failed_components: List[str] = [] self.warnings: List[str] = [] @@ -181,6 +182,10 @@ class Server: self.server_running = True await self._connect_klippy() + async def wait_connection_initialized(self) -> None: + async with self.connection_init_lock: + return + def add_log_rollover_item(self, name: str, item: str, log: bool = True) -> None: if self.file_logger is not None: @@ -266,10 +271,26 @@ class Server: ) -> None: self.events.setdefault(event, []).append(callback) - def send_event(self, event: str, *args) -> None: + def send_event(self, event: str, *args) -> asyncio.Future: + fut = self.event_loop.create_future() + self.event_loop.register_callback( + self._process_event, fut, event, *args) + return fut + + async def _process_event(self, + fut: asyncio.Future, + event: str, + *args + ) -> None: events = self.events.get(event, []) - for evt in events: - self.event_loop.register_callback(evt, *args) + coroutines: List[Coroutine] = [] + for func in events: + ret = func(*args) + if ret is not None: + coroutines.append(ret) + if coroutines: + await asyncio.gather(*coroutines) + fut.set_result(None) def register_remote_method(self, method_name: str, @@ -363,40 +384,43 @@ class Server: async def _init_klippy_connection(self) -> None: if not self.server_running: return - await self._check_ready() - await self._request_endpoints() - # Subscribe to "webhooks" - # Register "webhooks" subscription - if "webhooks_sub" not in self.init_list: - try: - await self.klippy_apis.subscribe_objects({'webhooks': None}) - except ServerError as e: - logging.info(f"{e}\nUnable to subscribe to webhooks object") + async with self.connection_init_lock: + await self._check_ready() + await self._request_endpoints() + # Subscribe to "webhooks" + # Register "webhooks" subscription + if "webhooks_sub" not in self.init_list: + try: + await self.klippy_apis.subscribe_objects( + {'webhooks': None}) + except ServerError as e: + logging.info( + f"{e}\nUnable to subscribe to webhooks object") + else: + logging.info("Webhooks Subscribed") + self.init_list.append("webhooks_sub") + # Subscribe to Gcode Output + if "gcode_output_sub" not in self.init_list: + try: + await self.klippy_apis.subscribe_gcode_output() + except ServerError as e: + logging.info( + f"{e}\nUnable to register gcode output subscription") + else: + logging.info("GCode Output Subscribed") + self.init_list.append("gcode_output_sub") + if ( + "startup_complete" in self.init_list or + not self.klippy_connection.is_connected() + ): + # Either Klippy is ready or the connection dropped + # during initialization. Exit initialization + self.init_attempts = 0 + self.init_handle = None else: - logging.info("Webhooks Subscribed") - self.init_list.append("webhooks_sub") - # Subscribe to Gcode Output - if "gcode_output_sub" not in self.init_list: - try: - await self.klippy_apis.subscribe_gcode_output() - except ServerError as e: - logging.info( - f"{e}\nUnable to register gcode output subscription") - else: - logging.info("GCode Output Subscribed") - self.init_list.append("gcode_output_sub") - if ( - "startup_complete" in self.init_list or - not self.klippy_connection.is_connected() - ): - # Either Klippy is ready or the connection dropped - # during initialization. Exit initialization - self.init_attempts = 0 - self.init_handle = None - else: - self.init_attempts += 1 - self.init_handle = self.event_loop.delay_callback( - INIT_TIME, self._init_klippy_connection) + self.init_attempts += 1 + self.init_handle = self.event_loop.delay_callback( + INIT_TIME, self._init_klippy_connection) async def _request_endpoints(self) -> None: result = await self.klippy_apis.list_endpoints(default=None) @@ -423,23 +447,24 @@ class Server: self.klippy_state = result.get('state', "unknown") if send_id: self.init_list.append("identified") - self.send_event("server:klippy_identified") + await self.send_event("server:klippy_identified") if self.klippy_state != "startup": self.init_list.append('startup_complete') - self.send_event("server:klippy_started", self.klippy_state) + await self.send_event("server:klippy_started", self.klippy_state) if self.klippy_state != "ready": msg = result.get('state_message', "Klippy Not Ready") logging.info("\n" + msg) - return - await self._verify_klippy_requirements() - logging.info("Klippy ready") - # register methods with klippy - for method in self.klippy_reg_methods: - try: - await self.klippy_apis.register_method(method) - except ServerError: - logging.exception(f"Unable to register method '{method}'") - self.send_event("server:klippy_ready") + else: + await self._verify_klippy_requirements() + # register methods with klippy + for method in self.klippy_reg_methods: + try: + await self.klippy_apis.register_method(method) + except ServerError: + logging.exception( + f"Unable to register method '{method}'") + logging.info("Klippy ready") + await self.send_event("server:klippy_ready") async def _verify_klippy_requirements(self) -> None: result = await self.klippy_apis.get_object_list(default=None)