mqtt: allow unlimited connection attempts
Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
parent
09ac00ce2e
commit
babfba1686
|
@ -186,7 +186,7 @@ class MQTTClient(APITransport, Subscribable):
|
||||||
self.client.on_unsubscribe = self._on_unsubscribe
|
self.client.on_unsubscribe = self._on_unsubscribe
|
||||||
self.connect_evt: asyncio.Event = asyncio.Event()
|
self.connect_evt: asyncio.Event = asyncio.Event()
|
||||||
self.disconnect_evt: Optional[asyncio.Event] = None
|
self.disconnect_evt: Optional[asyncio.Event] = None
|
||||||
self.reconnect_task: Optional[asyncio.Task] = None
|
self.connect_task: Optional[asyncio.Task] = None
|
||||||
self.subscribed_topics: SubscribedDict = {}
|
self.subscribed_topics: SubscribedDict = {}
|
||||||
self.pending_responses: List[asyncio.Future] = []
|
self.pending_responses: List[asyncio.Future] = []
|
||||||
self.pending_acks: Dict[int, asyncio.Future] = {}
|
self.pending_acks: Dict[int, asyncio.Future] = {}
|
||||||
|
@ -249,25 +249,22 @@ class MQTTClient(APITransport, Subscribable):
|
||||||
self.client.will_set(self.moonraker_status_topic,
|
self.client.will_set(self.moonraker_status_topic,
|
||||||
payload=json.dumps({'server': 'offline'}),
|
payload=json.dumps({'server': 'offline'}),
|
||||||
qos=self.qos, retain=True)
|
qos=self.qos, retain=True)
|
||||||
retries = 5
|
self.connect_task = self.event_loop.create_task(self._do_connect())
|
||||||
for _ in range(retries):
|
|
||||||
|
async def _do_connect(self):
|
||||||
|
while True:
|
||||||
try:
|
try:
|
||||||
await self.event_loop.run_in_thread(
|
await self.event_loop.run_in_thread(
|
||||||
self.client.connect, self.address, self.port)
|
self.client.connect, self.address, self.port)
|
||||||
except Exception as e:
|
except asyncio.CancelledError:
|
||||||
logging.info(f"MQTT connection error, {e}, "
|
raise
|
||||||
f"retries remaining: {retries}")
|
except Exception:
|
||||||
await asyncio.sleep(2.)
|
await asyncio.sleep(2.)
|
||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
else:
|
|
||||||
self.server.set_failed_component("mqtt")
|
|
||||||
self.server.add_warning(
|
|
||||||
f"MQTT Broker Connection at ({self.address}, {self.port}) "
|
|
||||||
"refused. Check your client and broker configuration.")
|
|
||||||
return
|
|
||||||
self.client.socket().setsockopt(
|
self.client.socket().setsockopt(
|
||||||
socket.SOL_SOCKET, socket.SO_SNDBUF, 2048)
|
socket.SOL_SOCKET, socket.SO_SNDBUF, 2048)
|
||||||
|
self.connect_task = None
|
||||||
|
|
||||||
async def _handle_klippy_identified(self) -> None:
|
async def _handle_klippy_identified(self) -> None:
|
||||||
if self.status_objs:
|
if self.status_objs:
|
||||||
|
@ -336,8 +333,8 @@ class MQTTClient(APITransport, Subscribable):
|
||||||
# The server connection was dropped, attempt to reconnect
|
# The server connection was dropped, attempt to reconnect
|
||||||
logging.info("MQTT Server Disconnected, reason: "
|
logging.info("MQTT Server Disconnected, reason: "
|
||||||
f"{paho_mqtt.error_string(reason_code)}")
|
f"{paho_mqtt.error_string(reason_code)}")
|
||||||
if self.reconnect_task is None:
|
if self.connect_task is None:
|
||||||
self.reconnect_task = asyncio.create_task(self._do_reconnect())
|
self.connect_task = asyncio.create_task(self._do_reconnect())
|
||||||
self.server.send_event("mqtt:disconnected")
|
self.server.send_event("mqtt:disconnected")
|
||||||
self.connect_evt.clear()
|
self.connect_evt.clear()
|
||||||
|
|
||||||
|
@ -387,7 +384,7 @@ class MQTTClient(APITransport, Subscribable):
|
||||||
self.client.socket().setsockopt(
|
self.client.socket().setsockopt(
|
||||||
socket.SOL_SOCKET, socket.SO_SNDBUF, 2048)
|
socket.SOL_SOCKET, socket.SO_SNDBUF, 2048)
|
||||||
break
|
break
|
||||||
self.reconnect_task = None
|
self.connect_task = None
|
||||||
|
|
||||||
async def wait_connection(self, timeout: Optional[float] = None) -> bool:
|
async def wait_connection(self, timeout: Optional[float] = None) -> bool:
|
||||||
try:
|
try:
|
||||||
|
@ -635,9 +632,9 @@ class MQTTClient(APITransport, Subscribable):
|
||||||
return self.instance_name
|
return self.instance_name
|
||||||
|
|
||||||
async def close(self) -> None:
|
async def close(self) -> None:
|
||||||
if self.reconnect_task is not None:
|
if self.connect_task is not None:
|
||||||
self.reconnect_task.cancel()
|
self.connect_task.cancel()
|
||||||
self.reconnect_task = None
|
self.connect_task = None
|
||||||
if not self.is_connected():
|
if not self.is_connected():
|
||||||
return
|
return
|
||||||
await self.publish_topic(self.moonraker_status_topic,
|
await self.publish_topic(self.moonraker_status_topic,
|
||||||
|
|
Loading…
Reference in New Issue