mqtt: fix connect/reconnect issues

Handle all potential exceptions.  Run the connect/disconnect
in another thread, as its possible for some calls to block
the event loop.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2021-12-06 08:38:27 -05:00
parent ac73036857
commit 1ad83cec97
1 changed files with 6 additions and 4 deletions

View File

@ -232,8 +232,9 @@ class MQTTClient(APITransport, Subscribable):
retries = 5 retries = 5
for _ in range(retries): for _ in range(retries):
try: try:
self.client.connect(self.address, self.port) await self.event_loop.run_in_thread(
except (ConnectionRefusedError, socket.gaierror) as e: self.client.connect, self.address, self.port)
except Exception as e:
logging.info(f"MQTT connection error, {e}, " logging.info(f"MQTT connection error, {e}, "
f"retries remaining: {retries}") f"retries remaining: {retries}")
await asyncio.sleep(2.) await asyncio.sleep(2.)
@ -351,14 +352,15 @@ class MQTTClient(APITransport, Subscribable):
async def _do_reconnect(self) -> None: async def _do_reconnect(self) -> None:
logging.info("Attempting MQTT Reconnect") logging.info("Attempting MQTT Reconnect")
self.event_loop
while True: while True:
try: try:
await asyncio.sleep(2.) await asyncio.sleep(2.)
except asyncio.CancelledError: except asyncio.CancelledError:
break break
try: try:
self.client.reconnect() await self.event_loop.run_in_thread(self.client.reconnect)
except (ConnectionRefusedError, socket.gaierror): except Exception:
continue continue
self.client.socket().setsockopt( self.client.socket().setsockopt(
socket.SOL_SOCKET, socket.SO_SNDBUF, 2048) socket.SOL_SOCKET, socket.SO_SNDBUF, 2048)