Skip to content

Commit a05b19f

Browse files
authored
Merge pull request #199 from vladak/loop_vs_keep_alive
improve ping handling
2 parents e19ece6 + 40a0019 commit a05b19f

File tree

2 files changed

+178
-14
lines changed

2 files changed

+178
-14
lines changed

adafruit_minimqtt/adafruit_minimqtt.py

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ def __init__(
226226
self._is_connected = False
227227
self._msg_size_lim = MQTT_MSG_SZ_LIM
228228
self._pid = 0
229-
self._timestamp: float = 0
229+
self._last_msg_sent_timestamp: float = 0
230230
self.logger = NullLogger()
231231
"""An optional logging attribute that can be set with with a Logger
232232
to enable debug logging."""
@@ -640,6 +640,7 @@ def _connect(
640640
if self._username is not None:
641641
self._send_str(self._username)
642642
self._send_str(self._password)
643+
self._last_msg_sent_timestamp = self.get_monotonic_time()
643644
self.logger.debug("Receiving CONNACK packet from broker")
644645
stamp = self.get_monotonic_time()
645646
while True:
@@ -694,6 +695,7 @@ def disconnect(self) -> None:
694695
self._sock.close()
695696
self._is_connected = False
696697
self._subscribed_topics = []
698+
self._last_msg_sent_timestamp = 0
697699
if self.on_disconnect is not None:
698700
self.on_disconnect(self, self.user_data, 0)
699701

@@ -707,6 +709,7 @@ def ping(self) -> list[int]:
707709
self._sock.send(MQTT_PINGREQ)
708710
ping_timeout = self.keep_alive
709711
stamp = self.get_monotonic_time()
712+
self._last_msg_sent_timestamp = stamp
710713
rc, rcs = None, []
711714
while rc != MQTT_PINGRESP:
712715
rc = self._wait_for_msg()
@@ -781,6 +784,7 @@ def publish(
781784
self._sock.send(pub_hdr_fixed)
782785
self._sock.send(pub_hdr_var)
783786
self._sock.send(msg)
787+
self._last_msg_sent_timestamp = self.get_monotonic_time()
784788
if qos == 0 and self.on_publish is not None:
785789
self.on_publish(self, self.user_data, topic, self._pid)
786790
if qos == 1:
@@ -858,6 +862,7 @@ def subscribe(self, topic: Optional[Union[tuple, str, list]], qos: int = 0) -> N
858862
self.logger.debug(f"payload: {payload}")
859863
self._sock.send(payload)
860864
stamp = self.get_monotonic_time()
865+
self._last_msg_sent_timestamp = stamp
861866
while True:
862867
op = self._wait_for_msg()
863868
if op is None:
@@ -933,6 +938,7 @@ def unsubscribe(self, topic: Optional[Union[str, list]]) -> None:
933938
for t in topics:
934939
self.logger.debug(f"UNSUBSCRIBING from topic {t}")
935940
self._sock.send(payload)
941+
self._last_msg_sent_timestamp = self.get_monotonic_time()
936942
self.logger.debug("Waiting for UNSUBACK...")
937943
while True:
938944
stamp = self.get_monotonic_time()
@@ -1022,7 +1028,6 @@ def reconnect(self, resub_topics: bool = True) -> int:
10221028
return ret
10231029

10241030
def loop(self, timeout: float = 0) -> Optional[list[int]]:
1025-
# pylint: disable = too-many-return-statements
10261031
"""Non-blocking message loop. Use this method to check for incoming messages.
10271032
Returns list of packet types of any messages received or None.
10281033
@@ -1038,23 +1043,27 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
10381043

10391044
self._connected()
10401045
self.logger.debug(f"waiting for messages for {timeout} seconds")
1041-
if self._timestamp == 0:
1042-
self._timestamp = self.get_monotonic_time()
1043-
current_time = self.get_monotonic_time()
1044-
if current_time - self._timestamp >= self.keep_alive:
1045-
self._timestamp = 0
1046-
# Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
1047-
self.logger.debug(
1048-
"KeepAlive period elapsed - requesting a PINGRESP from the server..."
1049-
)
1050-
rcs = self.ping()
1051-
return rcs
10521046

10531047
stamp = self.get_monotonic_time()
10541048
rcs = []
10551049

10561050
while True:
1057-
rc = self._wait_for_msg(timeout=timeout)
1051+
if (
1052+
self.get_monotonic_time() - self._last_msg_sent_timestamp
1053+
>= self.keep_alive
1054+
):
1055+
# Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
1056+
self.logger.debug(
1057+
"KeepAlive period elapsed - requesting a PINGRESP from the server..."
1058+
)
1059+
rcs.extend(self.ping())
1060+
# ping() itself contains a _wait_for_msg() loop which might have taken a while,
1061+
# so check here as well.
1062+
if self.get_monotonic_time() - stamp > timeout:
1063+
self.logger.debug(f"Loop timed out after {timeout} seconds")
1064+
break
1065+
1066+
rc = self._wait_for_msg()
10581067
if rc is not None:
10591068
rcs.append(rc)
10601069
if self.get_monotonic_time() - stamp > timeout:

tests/test_loop.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,99 @@
88
import socket
99
import ssl
1010
import time
11+
import errno
12+
1113
from unittest import TestCase, main
1214
from unittest.mock import patch
15+
from unittest import mock
1316

1417
import adafruit_minimqtt.adafruit_minimqtt as MQTT
1518

1619

20+
class Nulltet:
21+
"""
22+
Mock Socket that does nothing.
23+
24+
Inspired by the Mocket class from Adafruit_CircuitPython_Requests
25+
"""
26+
27+
def __init__(self):
28+
self.sent = bytearray()
29+
30+
self.timeout = mock.Mock()
31+
self.connect = mock.Mock()
32+
self.close = mock.Mock()
33+
34+
def send(self, bytes_to_send):
35+
"""
36+
Record the bytes. return the length of this bytearray.
37+
"""
38+
self.sent.extend(bytes_to_send)
39+
return len(bytes_to_send)
40+
41+
# MiniMQTT checks for the presence of "recv_into" and switches behavior based on that.
42+
# pylint: disable=unused-argument,no-self-use
43+
def recv_into(self, retbuf, bufsize):
44+
"""Always raise timeout exception."""
45+
exc = OSError()
46+
exc.errno = errno.ETIMEDOUT
47+
raise exc
48+
49+
50+
class Pingtet:
51+
"""
52+
Mock Socket tailored for PINGREQ testing.
53+
Records sent data, hands out PINGRESP for each PINGREQ received.
54+
55+
Inspired by the Mocket class from Adafruit_CircuitPython_Requests
56+
"""
57+
58+
PINGRESP = bytearray([0xD0, 0x00])
59+
60+
def __init__(self):
61+
self._to_send = self.PINGRESP
62+
63+
self.sent = bytearray()
64+
65+
self.timeout = mock.Mock()
66+
self.connect = mock.Mock()
67+
self.close = mock.Mock()
68+
69+
self._got_pingreq = False
70+
71+
def send(self, bytes_to_send):
72+
"""
73+
Recognize PINGREQ and record the indication that it was received.
74+
Assumes it was sent in one chunk (of 2 bytes).
75+
Also record the bytes. return the length of this bytearray.
76+
"""
77+
self.sent.extend(bytes_to_send)
78+
if bytes_to_send == b"\xc0\0":
79+
self._got_pingreq = True
80+
return len(bytes_to_send)
81+
82+
# MiniMQTT checks for the presence of "recv_into" and switches behavior based on that.
83+
def recv_into(self, retbuf, bufsize):
84+
"""
85+
If the PINGREQ indication is on, return PINGRESP, otherwise raise timeout exception.
86+
"""
87+
if self._got_pingreq:
88+
size = min(bufsize, len(self._to_send))
89+
if size == 0:
90+
return size
91+
chop = self._to_send[0:size]
92+
retbuf[0:] = chop
93+
self._to_send = self._to_send[size:]
94+
if len(self._to_send) == 0:
95+
self._got_pingreq = False
96+
self._to_send = self.PINGRESP
97+
return size
98+
99+
exc = OSError()
100+
exc.errno = errno.ETIMEDOUT
101+
raise exc
102+
103+
17104
class Loop(TestCase):
18105
"""basic loop() test"""
19106

@@ -54,6 +141,8 @@ def test_loop_basic(self) -> None:
54141

55142
time_before = time.monotonic()
56143
timeout = random.randint(3, 8)
144+
# pylint: disable=protected-access
145+
mqtt_client._last_msg_sent_timestamp = mqtt_client.get_monotonic_time()
57146
rcs = mqtt_client.loop(timeout=timeout)
58147
time_after = time.monotonic()
59148

@@ -64,6 +153,7 @@ def test_loop_basic(self) -> None:
64153
assert rcs is not None
65154
assert len(rcs) >= 1
66155
expected_rc = self.INITIAL_RCS_VAL
156+
# pylint: disable=not-an-iterable
67157
for ret_code in rcs:
68158
assert ret_code == expected_rc
69159
expected_rc += 1
@@ -104,6 +194,71 @@ def test_loop_is_connected(self):
104194

105195
assert "not connected" in str(context.exception)
106196

197+
# pylint: disable=no-self-use
198+
def test_loop_ping_timeout(self):
199+
"""Verify that ping will be sent even with loop timeout bigger than keep alive timeout
200+
and no outgoing messages are sent."""
201+
202+
recv_timeout = 2
203+
keep_alive_timeout = recv_timeout * 2
204+
mqtt_client = MQTT.MQTT(
205+
broker="localhost",
206+
port=1883,
207+
ssl_context=ssl.create_default_context(),
208+
connect_retries=1,
209+
socket_timeout=1,
210+
recv_timeout=recv_timeout,
211+
keep_alive=keep_alive_timeout,
212+
)
213+
214+
# patch is_connected() to avoid CONNECT/CONNACK handling.
215+
mqtt_client.is_connected = lambda: True
216+
mocket = Pingtet()
217+
# pylint: disable=protected-access
218+
mqtt_client._sock = mocket
219+
220+
start = time.monotonic()
221+
res = mqtt_client.loop(timeout=2 * keep_alive_timeout)
222+
assert time.monotonic() - start >= 2 * keep_alive_timeout
223+
assert len(mocket.sent) > 0
224+
assert len(res) == 2
225+
assert set(res) == {int(0xD0)}
226+
227+
# pylint: disable=no-self-use
228+
def test_loop_ping_vs_msgs_sent(self):
229+
"""Verify that ping will not be sent unnecessarily."""
230+
231+
recv_timeout = 2
232+
keep_alive_timeout = recv_timeout * 2
233+
mqtt_client = MQTT.MQTT(
234+
broker="localhost",
235+
port=1883,
236+
ssl_context=ssl.create_default_context(),
237+
connect_retries=1,
238+
socket_timeout=1,
239+
recv_timeout=recv_timeout,
240+
keep_alive=keep_alive_timeout,
241+
)
242+
243+
# patch is_connected() to avoid CONNECT/CONNACK handling.
244+
mqtt_client.is_connected = lambda: True
245+
246+
# With QoS=0 no PUBACK message is sent, so Nulltet can be used.
247+
mocket = Nulltet()
248+
# pylint: disable=protected-access
249+
mqtt_client._sock = mocket
250+
251+
i = 0
252+
topic = "foo"
253+
message = "bar"
254+
for _ in range(3 * keep_alive_timeout):
255+
mqtt_client.publish(topic, message, qos=0)
256+
mqtt_client.loop(1)
257+
i += 1
258+
259+
# This means no other messages than the PUBLISH messages generated by the code above.
260+
assert len(mocket.sent) == i * (2 + 2 + len(topic) + len(message))
261+
107262

108263
if __name__ == "__main__":
109264
main()

0 commit comments

Comments
 (0)