33import  android .util .Log ;
44
55import  java .util .ArrayList ;
6- import  java .util .Arrays ;
76import  java .util .Collections ;
87import  java .util .HashMap ;
98import  java .util .HashSet ;
@@ -157,7 +156,7 @@ public Observable<StompMessage> topic(String destinationPath, List<StompHeader>
157156           if  (subscribersSet  == null ) {
158157               subscribersSet  = new  HashSet <>();
159158               mSubscribers .put (destinationPath , subscribersSet );
160-                subscribePath (destinationPath , headerList );
159+                subscribePath (destinationPath , headerList ). subscribe () ;
161160           }
162161           subscribersSet .add (subscriber );
163162
@@ -169,16 +168,16 @@ public Observable<StompMessage> topic(String destinationPath, List<StompHeader>
169168                       set .remove (subscriber );
170169                       if  (set .size () < 1 ) {
171170                           mSubscribers .remove (dest );
172-                            unsubscribePath (dest );
171+                            unsubscribePath (dest ). subscribe () ;
173172                       }
174173                   }
175174               }
176175           }
177176       });
178177   }
179178
180-     private  void  subscribePath (String  destinationPath , List <StompHeader > headerList ) {
181-           if  (destinationPath  == null ) return ;
179+     private  Observable < Void >  subscribePath (String  destinationPath , List <StompHeader > headerList ) {
180+           if  (destinationPath  == null ) return   Observable . empty () ;
182181          String  topicId  = UUID .randomUUID ().toString ();
183182
184183          if  (mTopics  == null ) mTopics  = new  HashMap <>();
@@ -188,16 +187,16 @@ private void subscribePath(String destinationPath, List<StompHeader> headerList)
188187          headers .add (new  StompHeader (StompHeader .DESTINATION , destinationPath ));
189188          headers .add (new  StompHeader (StompHeader .ACK , DEFAULT_ACK ));
190189          if  (headerList  != null ) headers .addAll (headerList );
191-           send (new  StompMessage (StompCommand .SUBSCRIBE ,
190+           return   send (new  StompMessage (StompCommand .SUBSCRIBE ,
192191                  headers , null ));
193192      }
194193
195194
196-     private  void  unsubscribePath (String  dest ) {
195+     private  Observable < Void >  unsubscribePath (String  dest ) {
197196        String  topicId  = mTopics .get (dest );
198197        Log .d (TAG , "Unsubscribe path: "  + dest  + " id: "  + topicId );
199198
200-         send (new  StompMessage (StompCommand .UNSUBSCRIBE ,
199+         return   send (new  StompMessage (StompCommand .UNSUBSCRIBE ,
201200                Collections .singletonList (new  StompHeader (StompHeader .ID , topicId )), null ));
202201    }
203202
0 commit comments