mqtt: don't instantiate futures directly

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-06-28 07:52:43 -04:00
parent 3026a0c7e7
commit 0c23630e87
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
1 changed files with 10 additions and 10 deletions

View File

@ -244,7 +244,7 @@ class AIOHelper:
class MQTTClient(APITransport, Subscribable): class MQTTClient(APITransport, Subscribable):
def __init__(self, config: ConfigHelper) -> None: def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server() self.server = config.get_server()
self.event_loop = self.server.get_event_loop() self.eventloop = self.server.get_event_loop()
self.klippy: Klippy = self.server.lookup_component("klippy_connection") self.klippy: Klippy = self.server.lookup_component("klippy_connection")
self.address: str = config.get('address') self.address: str = config.get('address')
self.port: int = config.getint('port', 1883) self.port: int = config.getint('port', 1883)
@ -357,7 +357,7 @@ class MQTTClient(APITransport, Subscribable):
payload=jsonw.dumps({'server': 'offline'}), payload=jsonw.dumps({'server': 'offline'}),
qos=self.qos, retain=True) qos=self.qos, retain=True)
self.client.connect_async(self.address, self.port) self.client.connect_async(self.address, self.port)
self.connect_task = self.event_loop.create_task( self.connect_task = self.eventloop.create_task(
self._do_reconnect(first=True) self._do_reconnect(first=True)
) )
@ -379,7 +379,7 @@ class MQTTClient(APITransport, Subscribable):
if topic in self.subscribed_topics: if topic in self.subscribed_topics:
cb_hdls = self.subscribed_topics[topic][1] cb_hdls = self.subscribed_topics[topic][1]
for hdl in cb_hdls: for hdl in cb_hdls:
self.event_loop.register_callback( self.eventloop.register_callback(
hdl.callback, message.payload) hdl.callback, message.payload)
else: else:
logging.debug( logging.debug(
@ -401,7 +401,7 @@ class MQTTClient(APITransport, Subscribable):
if subs: if subs:
res, msg_id = client.subscribe(subs) res, msg_id = client.subscribe(subs)
if msg_id is not None: if msg_id is not None:
sub_fut: asyncio.Future = asyncio.Future() sub_fut: asyncio.Future = self.eventloop.create_future()
topics = list(self.subscribed_topics.keys()) topics = list(self.subscribed_topics.keys())
sub_fut.add_done_callback( sub_fut.add_done_callback(
BrokerAckLogger(topics, "subscribe")) BrokerAckLogger(topics, "subscribe"))
@ -475,7 +475,7 @@ class MQTTClient(APITransport, Subscribable):
raise raise
first = False first = False
try: try:
sock = await self.event_loop.create_socket_connection( sock = await self.eventloop.create_socket_connection(
(self.address, self.port), timeout=10 (self.address, self.port), timeout=10
) )
self.client.reconnect(sock) self.client.reconnect(sock)
@ -523,7 +523,7 @@ class MQTTClient(APITransport, Subscribable):
if self.is_connected() and need_sub: if self.is_connected() and need_sub:
res, msg_id = self.client.subscribe(topic, qos) res, msg_id = self.client.subscribe(topic, qos)
if msg_id is not None: if msg_id is not None:
sub_fut: asyncio.Future = asyncio.Future() sub_fut: asyncio.Future = self.eventloop.create_future()
sub_fut.add_done_callback( sub_fut.add_done_callback(
BrokerAckLogger([topic], "subscribe")) BrokerAckLogger([topic], "subscribe"))
self.pending_acks[msg_id] = sub_fut self.pending_acks[msg_id] = sub_fut
@ -541,7 +541,7 @@ class MQTTClient(APITransport, Subscribable):
del self.subscribed_topics[topic] del self.subscribed_topics[topic]
res, msg_id = self.client.unsubscribe(topic) res, msg_id = self.client.unsubscribe(topic)
if msg_id is not None: if msg_id is not None:
unsub_fut: asyncio.Future = asyncio.Future() unsub_fut: asyncio.Future = self.eventloop.create_future()
unsub_fut.add_done_callback( unsub_fut.add_done_callback(
BrokerAckLogger([topic], "unsubscribe")) BrokerAckLogger([topic], "unsubscribe"))
self.pending_acks[msg_id] = unsub_fut self.pending_acks[msg_id] = unsub_fut
@ -555,7 +555,7 @@ class MQTTClient(APITransport, Subscribable):
qos = qos or self.qos qos = qos or self.qos
if qos > 2 or qos < 0: if qos > 2 or qos < 0:
raise self.server.error("QOS must be between 0 and 2") raise self.server.error("QOS must be between 0 and 2")
pub_fut: asyncio.Future = asyncio.Future() pub_fut: asyncio.Future = self.eventloop.create_future()
if isinstance(payload, (dict, list)): if isinstance(payload, (dict, list)):
try: try:
payload = jsonw.dumps(payload) payload = jsonw.dumps(payload)
@ -602,7 +602,7 @@ class MQTTClient(APITransport, Subscribable):
qos = qos or self.qos qos = qos or self.qos
if qos > 2 or qos < 0: if qos > 2 or qos < 0:
raise self.server.error("QOS must be between 0 and 2") raise self.server.error("QOS must be between 0 and 2")
resp_fut: asyncio.Future = asyncio.Future() resp_fut: asyncio.Future = self.eventloop.create_future()
resp_hdl = self.subscribe_topic( resp_hdl = self.subscribe_topic(
response_topic, resp_fut.set_result, qos) response_topic, resp_fut.set_result, qos)
self.pending_responses.append(resp_fut) self.pending_responses.append(resp_fut)
@ -644,7 +644,7 @@ class MQTTClient(APITransport, Subscribable):
topic: str = web_request.get_str("topic") topic: str = web_request.get_str("topic")
qos: int = web_request.get_int("qos", self.qos) qos: int = web_request.get_int("qos", self.qos)
timeout: Optional[float] = web_request.get_float('timeout', None) timeout: Optional[float] = web_request.get_float('timeout', None)
resp: asyncio.Future = asyncio.Future() resp: asyncio.Future = self.eventloop.create_future()
hdl: Optional[SubscriptionHandle] = None hdl: Optional[SubscriptionHandle] = None
try: try:
hdl = self.subscribe_topic(topic, resp.set_result, qos) hdl = self.subscribe_topic(topic, resp.set_result, qos)