mqtt: add support for paho client version 2.0

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2024-04-28 11:47:20 -04:00
parent a802e10725
commit 1ab413e9bc
1 changed files with 70 additions and 2 deletions

View File

@ -12,6 +12,7 @@ import pathlib
import ssl
from collections import deque
import paho.mqtt.client as paho_mqtt
import paho.mqtt
from ..common import (
TransportType,
RequestType,
@ -43,6 +44,7 @@ if TYPE_CHECKING:
FlexCallback = Callable[[bytes], Optional[Coroutine]]
RPCCallback = Callable[..., Coroutine]
PAHO_MQTT_VERSION = tuple([int(p) for p in paho.mqtt.__version__.split(".")])
DUP_API_REQ_CODE = -10000
MQTT_PROTOCOLS = {
'v3.1': paho_mqtt.MQTTv31,
@ -61,7 +63,9 @@ class ExtPahoClient(paho_mqtt.Client):
if self._port <= 0:
raise ValueError('Invalid port number.')
if hasattr(self, "_out_packet_mutex"):
if PAHO_MQTT_VERSION >=(2, 0):
return self._v2_reconnect(sock)
if PAHO_MQTT_VERSION < (1, 6):
# Paho Mqtt Version < 1.6.x
self._in_packet = {
"command": 0,
@ -158,6 +162,65 @@ class ExtPahoClient(paho_mqtt.Client):
return self._send_connect(self._keepalive)
def _v2_reconnect(self, sock: Optional[socket.socket] = None):
self._in_packet = {
"command": 0,
"have_remaining": 0,
"remaining_count": [],
"remaining_mult": 1,
"remaining_length": 0,
"packet": bytearray(b""),
"to_process": 0,
"pos": 0,
}
self._ping_t = 0.0 # type: ignore
self._state = paho_mqtt._ConnectionState.MQTT_CS_CONNECTING
self._sock_close()
# Mark all currently outgoing QoS = 0 packets as lost,
# or `wait_for_publish()` could hang forever
for pkt in self._out_packet:
if (
pkt["command"] & 0xF0 == paho_mqtt.PUBLISH and
pkt["qos"] == 0 and pkt["info"] is not None
):
pkt["info"].rc = paho_mqtt.MQTT_ERR_CONN_LOST
pkt["info"]._set_as_published()
self._out_packet.clear()
with self._msgtime_mutex:
self._last_msg_in = paho_mqtt.time_func()
self._last_msg_out = paho_mqtt.time_func()
# Put messages in progress in a valid state.
self._messages_reconnect_reset()
with self._callback_mutex:
on_pre_connect = self.on_pre_connect
if on_pre_connect:
try:
on_pre_connect(self, self._userdata)
except Exception as err:
self._easy_log(
paho_mqtt.MQTT_LOG_ERR,
'Caught exception in on_pre_connect: %s', err
)
if not self.suppress_exceptions:
raise
self._sock = sock or self._create_socket()
self._sock.setblocking(False) # type: ignore[attr-defined]
self._registered_write = False
self._call_socket_open(self._sock)
return self._send_connect(self._keepalive)
class SubscriptionHandle:
def __init__(self, topic: str, callback: FlexCallback) -> None:
self.callback = callback
@ -288,7 +351,12 @@ class MQTTClient(APITransport):
"between 0 and 2")
self.publish_split_status = \
config.getboolean("publish_split_status", False)
self.client = ExtPahoClient(protocol=self.protocol)
if PAHO_MQTT_VERSION < (2, 0):
self.client = ExtPahoClient(protocol=self.protocol)
else:
self.client = ExtPahoClient(
paho_mqtt.CallbackAPIVersion.VERSION1, protocol=self.protocol
)
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.client.on_disconnect = self._on_disconnect