@@ -790,22 +790,34 @@ def subscribe(self, topic, qos=0):
790790 stamp = time .monotonic ()
791791 while True :
792792 op = self ._wait_for_msg ()
793- if op == 0x90 :
794- rc = self ._sock_exact_recv (4 )
795- assert rc [1 ] == packet [2 ] and rc [2 ] == packet [3 ]
796- if rc [3 ] == 0x80 :
797- raise MMQTTException ("SUBACK Failure!" )
798- for t , q in topics :
799- if self .on_subscribe is not None :
800- self .on_subscribe (self , self ._user_data , t , q )
801- self ._subscribed_topics .append (t )
802- return
803-
804793 if op is None :
805794 if time .monotonic () - stamp > self ._recv_timeout :
806795 raise MMQTTException (
807796 f"No data received from broker for { self ._recv_timeout } seconds."
808797 )
798+ else :
799+ if op == 0x90 :
800+ rc = self ._sock_exact_recv (3 )
801+ # Check packet identifier.
802+ assert rc [1 ] == packet [2 ] and rc [2 ] == packet [3 ]
803+ remaining_len = rc [0 ] - 2
804+ assert remaining_len > 0
805+ rc = self ._sock_exact_recv (remaining_len )
806+ for i in range (0 , remaining_len ):
807+ if rc [i ] not in [0 , 1 , 2 ]:
808+ raise MMQTTException (
809+ f"SUBACK Failure for topic { topics [i ][0 ]} : { hex (rc [i ])} "
810+ )
811+
812+ for t , q in topics :
813+ if self .on_subscribe is not None :
814+ self .on_subscribe (self , self ._user_data , t , q )
815+ self ._subscribed_topics .append (t )
816+ return
817+
818+ raise MMQTTException (
819+ f"invalid message received as response to SUBSCRIBE: { hex (op )} "
820+ )
809821
810822 def unsubscribe (self , topic ):
811823 """Unsubscribes from a MQTT topic.
@@ -844,22 +856,26 @@ def unsubscribe(self, topic):
844856 while True :
845857 stamp = time .monotonic ()
846858 op = self ._wait_for_msg ()
847- if op == 176 :
848- rc = self ._sock_exact_recv (3 )
849- assert rc [0 ] == 0x02
850- # [MQTT-3.32]
851- assert rc [1 ] == packet_id_bytes [0 ] and rc [2 ] == packet_id_bytes [1 ]
852- for t in topics :
853- if self .on_unsubscribe is not None :
854- self .on_unsubscribe (self , self ._user_data , t , self ._pid )
855- self ._subscribed_topics .remove (t )
856- return
857-
858859 if op is None :
859860 if time .monotonic () - stamp > self ._recv_timeout :
860861 raise MMQTTException (
861862 f"No data received from broker for { self ._recv_timeout } seconds."
862863 )
864+ else :
865+ if op == 176 :
866+ rc = self ._sock_exact_recv (3 )
867+ assert rc [0 ] == 0x02
868+ # [MQTT-3.32]
869+ assert rc [1 ] == packet_id_bytes [0 ] and rc [2 ] == packet_id_bytes [1 ]
870+ for t in topics :
871+ if self .on_unsubscribe is not None :
872+ self .on_unsubscribe (self , self ._user_data , t , self ._pid )
873+ self ._subscribed_topics .remove (t )
874+ return
875+
876+ raise MMQTTException (
877+ f"invalid message received as response to UNSUBSCRIBE: { hex (op )} "
878+ )
863879
864880 def _recompute_reconnect_backoff (self ):
865881 """
@@ -998,6 +1014,7 @@ def _wait_for_msg(self, timeout=0.1):
9981014 return MQTT_PINGRESP
9991015
10001016 if res [0 ] & MQTT_PKT_TYPE_MASK != MQTT_PUBLISH :
1017+ self .logger .debug (f"Got message type: { hex (res [0 ])} " )
10011018 return res [0 ]
10021019
10031020 # Handle only the PUBLISH packet type from now on.
0 commit comments