@@ -784,22 +784,35 @@ def subscribe(self, topic, qos=0):
784784 stamp = time .monotonic ()
785785 while True :
786786 op = self ._wait_for_msg ()
787- if op == 0x90 :
788- rc = self ._sock_exact_recv (4 )
789- assert rc [1 ] == packet [2 ] and rc [2 ] == packet [3 ]
790- if rc [3 ] == 0x80 :
791- raise MMQTTException ("SUBACK Failure!" )
792- for t , q in topics :
793- if self .on_subscribe is not None :
794- self .on_subscribe (self , self ._user_data , t , q )
795- self ._subscribed_topics .append (t )
796- return
797-
798787 if op is None :
799788 if time .monotonic () - stamp > self ._recv_timeout :
800789 raise MMQTTException (
801790 f"No data received from broker for { self ._recv_timeout } seconds."
802791 )
792+ else :
793+ if op == 0x90 :
794+ rc = self ._sock_exact_recv (3 )
795+ # Check packet identifier.
796+ assert rc [1 ] == packet [2 ] and rc [2 ] == packet [3 ]
797+ remaining_len = rc [0 ] - 2
798+ assert remaining_len > 0
799+ rc = self ._sock_exact_recv (remaining_len )
800+ for i in range (0 , remaining_len ):
801+ if rc [i ] not in [0 , 1 , 2 ]:
802+ raise MMQTTException (
803+ f"SUBACK Failure for topic "
804+ f"{ topics [i ][0 ]} : { hex (rc [i ])} "
805+ )
806+
807+ for t , q in topics :
808+ if self .on_subscribe is not None :
809+ self .on_subscribe (self , self ._user_data , t , q )
810+ self ._subscribed_topics .append (t )
811+ return
812+
813+ raise MMQTTException (
814+ f"invalid message received as response to SUBSCRIBE: { hex (op )} "
815+ )
803816
804817 def unsubscribe (self , topic ):
805818 """Unsubscribes from a MQTT topic.
@@ -838,22 +851,26 @@ def unsubscribe(self, topic):
838851 while True :
839852 stamp = time .monotonic ()
840853 op = self ._wait_for_msg ()
841- if op == 176 :
842- rc = self ._sock_exact_recv (3 )
843- assert rc [0 ] == 0x02
844- # [MQTT-3.32]
845- assert rc [1 ] == packet_id_bytes [0 ] and rc [2 ] == packet_id_bytes [1 ]
846- for t in topics :
847- if self .on_unsubscribe is not None :
848- self .on_unsubscribe (self , self ._user_data , t , self ._pid )
849- self ._subscribed_topics .remove (t )
850- return
851-
852854 if op is None :
853855 if time .monotonic () - stamp > self ._recv_timeout :
854856 raise MMQTTException (
855857 f"No data received from broker for { self ._recv_timeout } seconds."
856858 )
859+ else :
860+ if op == 176 :
861+ rc = self ._sock_exact_recv (3 )
862+ assert rc [0 ] == 0x02
863+ # [MQTT-3.32]
864+ assert rc [1 ] == packet_id_bytes [0 ] and rc [2 ] == packet_id_bytes [1 ]
865+ for t in topics :
866+ if self .on_unsubscribe is not None :
867+ self .on_unsubscribe (self , self ._user_data , t , self ._pid )
868+ self ._subscribed_topics .remove (t )
869+ return
870+
871+ raise MMQTTException (
872+ f"invalid message received as response to UNSUBSCRIBE: { hex (op )} "
873+ )
857874
858875 def _recompute_reconnect_backoff (self ):
859876 """
@@ -992,6 +1009,7 @@ def _wait_for_msg(self, timeout=0.1):
9921009 return MQTT_PINGRESP
9931010
9941011 if res [0 ] & MQTT_PKT_TYPE_MASK != MQTT_PUBLISH :
1012+ self .logger .debug (f"Got message type: { hex (res [0 ])} " )
9951013 return res [0 ]
9961014
9971015 # Handle only the PUBLISH packet type from now on.
0 commit comments