klippy_connection: rework klippy initialization

Handle a race condition where a shutdown event could be received
from the webhooks subscription during initialization.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-02-03 11:06:44 -05:00
parent babf97be3b
commit 1be19be747
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
5 changed files with 92 additions and 68 deletions

View File

@ -130,7 +130,7 @@ class JobQueue:
raise self.server.error(
"Queue State Changed during Transition Gcode")
self._set_queue_state("starting")
await kapis.start_print(filename)
await kapis.start_print(filename, wait_klippy_started=True)
except self.server.error:
logging.exception(f"Error Loading print: {filename}")
self._set_queue_state("paused")

View File

@ -80,14 +80,15 @@ class KlippyAPI(Subscribable):
async def _gcode_firmware_restart(self, web_request: WebRequest) -> str:
return await self.do_restart("FIRMWARE_RESTART")
async def _send_klippy_request(self,
method: str,
params: Dict[str, Any],
default: Any = SENTINEL
) -> Any:
async def _send_klippy_request(
self,
method: str,
params: Dict[str, Any],
default: Any = SENTINEL
) -> Any:
try:
result = await self.klippy.request(
WebRequest(method, params, conn=self))
req = WebRequest(method, params, conn=self)
result = await self.klippy.request(req)
except self.server.error:
if isinstance(default, SentinelClass):
raise
@ -103,9 +104,11 @@ class KlippyAPI(Subscribable):
GCODE_ENDPOINT, params, default)
return result
async def start_print(self, filename: str) -> str:
async def start_print(
self, filename: str, wait_klippy_started: bool = False
) -> str:
# WARNING: Do not call this method from within the following
# event handlers:
# event handlers when "wait_klippy_started" is set to True:
# klippy_identified, klippy_started, klippy_ready, klippy_disconnect
# Doing so will result in "wait_started" blocking for the specifed
# timeout (default 20s) and returning False.
@ -115,7 +118,8 @@ class KlippyAPI(Subscribable):
# Escape existing double quotes in the file name
filename = filename.replace("\"", "\\\"")
script = f'SDCARD_PRINT_FILE FILENAME="{filename}"'
await self.klippy.wait_started()
if wait_klippy_started:
await self.klippy.wait_started()
return await self.run_gcode(script)
async def pause_print(
@ -139,13 +143,16 @@ class KlippyAPI(Subscribable):
return await self._send_klippy_request(
"pause_resume/cancel", {}, default)
async def do_restart(self, gc: str) -> str:
async def do_restart(
self, gc: str, wait_klippy_started: bool = False
) -> str:
# WARNING: Do not call this method from within the following
# event handlers:
# event handlers when "wait_klippy_started" is set to True:
# klippy_identified, klippy_started, klippy_ready, klippy_disconnect
# Doing so will result in "wait_started" blocking for the specifed
# timeout (default 20s) and returning False.
await self.klippy.wait_started()
if wait_klippy_started:
await self.klippy.wait_started()
try:
result = await self.run_gcode(gc)
except self.server.error as e:

View File

@ -323,8 +323,9 @@ class MQTTClient(APITransport, Subscribable):
self.status_objs[key] = None
if status_cfg:
logging.debug(f"MQTT: Status Objects Set: {self.status_objs}")
self.server.register_event_handler("server:klippy_identified",
self._handle_klippy_identified)
self.server.register_event_handler(
"server:klippy_started", self._handle_klippy_started
)
self.timestamp_deque: Deque = deque(maxlen=20)
self.api_qos = config.getint('api_qos', self.qos)
@ -360,7 +361,7 @@ class MQTTClient(APITransport, Subscribable):
self._do_reconnect(first=True)
)
async def _handle_klippy_identified(self) -> None:
async def _handle_klippy_started(self, state: str) -> None:
if self.status_objs:
args = {'objects': self.status_objs}
try:

View File

@ -273,7 +273,8 @@ class PowerDevice:
kapis: APIComp = self.server.lookup_component("klippy_apis")
event_loop.delay_callback(
self.restart_delay, kapis.do_restart,
"FIRMWARE_RESTART")
"FIRMWARE_RESTART", True
)
def get_name(self) -> str:
return self.name

View File

@ -63,7 +63,9 @@ class KlippyConnection:
self.connection_task: Optional[asyncio.Task] = None
self.closing: bool = False
self._klippy_info: Dict[str, Any] = {}
self.init_list: List[str] = []
self._klippy_identified: bool = False
self._klippy_initializing: bool = False
self._klippy_started: bool = False
self._klipper_version: str = ""
self._missing_reqs: Set[str] = set()
self._peer_cred: Dict[str, int] = {}
@ -97,6 +99,8 @@ class KlippyConnection:
@property
def state(self) -> str:
if not self._klippy_started:
return "startup"
return self._state
@property
@ -273,47 +277,20 @@ class KlippyConnection:
return True
async def _init_klippy_connection(self) -> bool:
self.init_list = []
self._klippy_identified = False
self._klippy_started = False
self._klippy_initializing = True
self._missing_reqs.clear()
self.init_attempts = 0
self._state = "initializing"
webhooks_err_logged = False
gcout_err_logged = False
self._state = "startup"
while self.server.is_running():
await asyncio.sleep(INIT_TIME)
# Subscribe to "webhooks"
# Register "webhooks" subscription
if "webhooks_sub" not in self.init_list:
try:
await self.klippy_apis.subscribe_objects(
{'webhooks': None})
except ServerError as e:
if not webhooks_err_logged:
webhooks_err_logged = True
logging.info(
f"{e}\nUnable to subscribe to webhooks object")
else:
logging.info("Webhooks Subscribed")
self.init_list.append("webhooks_sub")
# Subscribe to Gcode Output
if "gcode_output_sub" not in self.init_list:
try:
await self.klippy_apis.subscribe_gcode_output()
except ServerError as e:
if not gcout_err_logged:
gcout_err_logged = True
logging.info(
f"{e}\nUnable to register gcode output "
"subscription")
else:
logging.info("GCode Output Subscribed")
self.init_list.append("gcode_output_sub")
if "startup_complete" not in self.init_list:
await self._check_ready()
if len(self.init_list) == 5:
await self._check_ready()
if not self._klippy_initializing:
logging.debug("Klippy Connection Initialized")
return True
elif not self.is_connected():
if not self.is_connected():
self._klippy_initializing = False
break
else:
self.init_attempts += 1
@ -330,8 +307,24 @@ class KlippyConnection:
if ep not in RESERVED_ENDPOINTS:
app.register_remote_handler(ep)
async def _request_initial_subscriptions(self) -> None:
try:
await self.klippy_apis.subscribe_objects({'webhooks': None})
except ServerError as e:
logging.exception("Unable to subscribe to webhooks object")
else:
logging.info("Webhooks Subscribed")
try:
await self.klippy_apis.subscribe_gcode_output()
except ServerError as e:
logging.exception(
"Unable to register gcode output subscription"
)
else:
logging.info("GCode Output Subscribed")
async def _check_ready(self) -> None:
send_id = "identified" not in self.init_list
send_id = not self._klippy_identified
result: Dict[str, Any]
try:
result = await self.klippy_apis.get_klippy_info(send_id)
@ -351,20 +344,28 @@ class KlippyConnection:
self._klippy_info = dict(result)
if "state_message" in self._klippy_info:
self._state_message = self._klippy_info["state_message"]
state = result.get('state', "unknown")
if state != "startup" and "endpoints_requested" not in self.init_list:
await self._request_endpoints()
self.init_list.append("endpoints_requested")
self._state = state
if "state" not in result:
return
if send_id:
self.init_list.append("identified")
self._klippy_identified = True
await self.server.send_event("server:klippy_identified")
# Request initial endpoints to register info, emergency stop APIs
await self._request_endpoints()
self._state = result["state"]
if self._state != "startup":
self.init_list.append('startup_complete')
await self.server.send_event("server:klippy_started",
self._state)
await self._request_initial_subscriptions()
# Register remaining endpoints available
await self._request_endpoints()
startup_state = self._state
await self.server.send_event(
"server:klippy_started", startup_state
)
self._klippy_started = True
if self._state != "ready":
logging.info("\n" + self._state_message)
if self._state == "shutdown" and startup_state != "shutdown":
# Klippy shutdown during startup event
self.server.send_event("server:klippy_shutdown")
else:
await self._verify_klippy_requirements()
# register methods with klippy
@ -374,8 +375,18 @@ class KlippyConnection:
except ServerError:
logging.exception(
f"Unable to register method '{method}'")
logging.info("Klippy ready")
await self.server.send_event("server:klippy_ready")
if self._state == "ready":
logging.info("Klippy ready")
await self.server.send_event("server:klippy_ready")
if self._state == "shutdown":
# Klippy shutdown during ready event
self.server.send_event("server:klippy_shutdown")
else:
logging.info(
"Klippy state transition from ready during init, "
f"new state: {self._state}"
)
self._klippy_initializing = False
async def _verify_klippy_requirements(self) -> None:
result = await self.klippy_apis.get_object_list(default=None)
@ -461,7 +472,9 @@ class KlippyConnection:
# XXX - process other states (startup, ready, error, etc)?
if "state" in wh:
state = wh["state"]
if state == "shutdown":
if state == "shutdown" and not self._klippy_initializing:
# If the shutdown state is received during initialization
# defer the event, the init routine will handle it.
logging.info("Klippy has shutdown")
self.server.send_event("server:klippy_shutdown")
self._state = state
@ -593,7 +606,9 @@ class KlippyConnection:
await machine.do_service_action("start", self.unit_name)
async def _on_connection_closed(self) -> None:
self.init_list = []
self._klippy_identified = False
self._klippy_initializing = False
self._klippy_started = False
self._state = "disconnected"
self._state_message = "Klippy Disconnected"
for request in self.pending_requests.values():