@@ -516,38 +516,42 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None):
516516 return result
517517
518518 def disconnect (self ):
519- """Disconnects the MiniMQTT client from the MQTT broker.
520- """
519+ """Disconnects the MiniMQTT client from the MQTT broker."""
521520 self .is_connected ()
522- if self .logger :
521+ if self .logger is not None :
523522 self .logger .debug ("Sending DISCONNECT packet to broker" )
524- self ._sock .send (MQTT_DISCONNECT )
525- if self .logger :
523+ try :
524+ self ._sock .send (MQTT_DISCONNECT )
525+ except RuntimeError as e :
526+ if self .logger :
527+ self .logger .warning ("Unable to send DISCONNECT packet: {}" .format (e ))
528+ if self .logger is not None :
526529 self .logger .debug ("Closing socket" )
527- self ._free_sockets ()
530+ self ._sock . close ()
528531 self ._is_connected = False
529- self ._subscribed_topics = None
532+ self ._subscribed_topics = []
530533 if self .on_disconnect is not None :
531- self .on_disconnect (self , self ._user_data , 0 )
534+ self .on_disconnect (self , self .user_data , 0 )
532535
533536 def ping (self ):
534537 """Pings the MQTT Broker to confirm if the broker is alive or if
535538 there is an active network connection.
539+ Returns response codes of any messages received while waiting for PINGRESP.
536540 """
537541 self .is_connected ()
538- buf = self ._rx_buffer
539542 if self .logger :
540543 self .logger .debug ("Sending PINGREQ" )
541544 self ._sock .send (MQTT_PINGREQ )
542- if self .logger :
543- self .logger .debug ("Checking PINGRESP" )
544- while True :
545- op = self ._wait_for_msg ()
546- if op == 208 :
547- self ._recv_into (buf , 2 )
548- if buf [0 ] != 0x00 :
549- raise MMQTTException ("PINGRESP not returned from broker." )
550- return
545+ ping_timeout = self .keep_alive
546+ stamp = time .monotonic ()
547+ rc , rcs = None , []
548+ while rc != MQTT_PINGRESP :
549+ rc = self ._wait_for_msg ()
550+ if rc :
551+ rcs .append (rc )
552+ if time .monotonic () - stamp > ping_timeout :
553+ raise MMQTTException ("PINGRESP not returned from broker." )
554+ return rcs
551555
552556 # pylint: disable=too-many-branches, too-many-statements
553557 def publish (self , topic , msg , retain = False , qos = 0 ):
@@ -794,12 +798,12 @@ def unsubscribe(self, topic):
794798 while True :
795799 op = self ._wait_for_msg ()
796800 if op == 176 :
797- self ._recv_into ( buf , 3 )
798- assert buf [0 ] == 0x02
801+ rc = self ._sock_exact_recv ( 3 )
802+ assert rc [0 ] == 0x02
799803 # [MQTT-3.32]
800804 assert (
801- buf [1 ] == packet_id_bytes [0 ]
802- and buf [2 ] == packet_id_bytes [1 ]
805+ rc [1 ] == packet_id_bytes [0 ]
806+ and rc [2 ] == packet_id_bytes [1 ]
803807 )
804808 for t in topics :
805809 if self .on_unsubscribe is not None :
@@ -828,33 +832,36 @@ def reconnect(self, resub_topics=True):
828832 feed = subscribed_topics .pop ()
829833 self .subscribe (feed )
830834
831- def loop (self , timeout = 0.01 ):
835+ def loop (self , timeout = 1 ):
832836 """Non-blocking message loop. Use this method to
833837 check incoming subscription messages.
834- :param float timeout: Set timeout in seconds for
835- polling the message queue.
838+ Returns response codes of any messages received.
839+ :param int timeout: Socket timeout, in seconds.
840+
836841 """
837842 if self ._timestamp == 0 :
838843 self ._timestamp = time .monotonic ()
839844 current_time = time .monotonic ()
840845 if current_time - self ._timestamp >= self .keep_alive :
841846 # Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
842- if self .logger :
847+ if self .logger is not None :
843848 self .logger .debug (
844849 "KeepAlive period elapsed - \
845850 requesting a PINGRESP from the server..."
846851 )
847- self .ping ()
852+ rcs = self .ping ()
848853 self ._timestamp = 0
849- return self ._wait_for_msg (timeout )
854+ return rcs
855+ self ._sock .settimeout (timeout )
856+ rc = self ._wait_for_msg ()
857+ return [rc ] if rc else None
858+
850859
851860 def _wait_for_msg (self , timeout = 0.1 ):
852861 """Reads and processes network events."""
853862 buf = self ._rx_buffer
854863 res = bytearray (1 )
855864
856- # Attempt to read
857- self ._sock .settimeout (1 )
858865 try :
859866 self ._sock .recv_into (res , 1 )
860867 except OSError as error :
0 commit comments