diff --git a/moonraker/components/mqtt.py b/moonraker/components/mqtt.py index 47fc9be..5434d23 100644 --- a/moonraker/components/mqtt.py +++ b/moonraker/components/mqtt.py @@ -186,7 +186,7 @@ class MQTTClient(APITransport, Subscribable): self.client.on_unsubscribe = self._on_unsubscribe self.connect_evt: asyncio.Event = asyncio.Event() self.disconnect_evt: Optional[asyncio.Event] = None - self.reconnect_task: Optional[asyncio.Task] = None + self.connect_task: Optional[asyncio.Task] = None self.subscribed_topics: SubscribedDict = {} self.pending_responses: List[asyncio.Future] = [] self.pending_acks: Dict[int, asyncio.Future] = {} @@ -249,25 +249,22 @@ class MQTTClient(APITransport, Subscribable): self.client.will_set(self.moonraker_status_topic, payload=json.dumps({'server': 'offline'}), qos=self.qos, retain=True) - retries = 5 - for _ in range(retries): + self.connect_task = self.event_loop.create_task(self._do_connect()) + + async def _do_connect(self): + while True: try: await self.event_loop.run_in_thread( self.client.connect, self.address, self.port) - except Exception as e: - logging.info(f"MQTT connection error, {e}, " - f"retries remaining: {retries}") + except asyncio.CancelledError: + raise + except Exception: await asyncio.sleep(2.) else: break - else: - self.server.set_failed_component("mqtt") - self.server.add_warning( - f"MQTT Broker Connection at ({self.address}, {self.port}) " - "refused. Check your client and broker configuration.") - return self.client.socket().setsockopt( socket.SOL_SOCKET, socket.SO_SNDBUF, 2048) + self.connect_task = None async def _handle_klippy_identified(self) -> None: if self.status_objs: @@ -336,8 +333,8 @@ class MQTTClient(APITransport, Subscribable): # The server connection was dropped, attempt to reconnect logging.info("MQTT Server Disconnected, reason: " f"{paho_mqtt.error_string(reason_code)}") - if self.reconnect_task is None: - self.reconnect_task = asyncio.create_task(self._do_reconnect()) + if self.connect_task is None: + self.connect_task = asyncio.create_task(self._do_reconnect()) self.server.send_event("mqtt:disconnected") self.connect_evt.clear() @@ -387,7 +384,7 @@ class MQTTClient(APITransport, Subscribable): self.client.socket().setsockopt( socket.SOL_SOCKET, socket.SO_SNDBUF, 2048) break - self.reconnect_task = None + self.connect_task = None async def wait_connection(self, timeout: Optional[float] = None) -> bool: try: @@ -635,9 +632,9 @@ class MQTTClient(APITransport, Subscribable): return self.instance_name async def close(self) -> None: - if self.reconnect_task is not None: - self.reconnect_task.cancel() - self.reconnect_task = None + if self.connect_task is not None: + self.connect_task.cancel() + self.connect_task = None if not self.is_connected(): return await self.publish_topic(self.moonraker_status_topic,