Skip to content

Commit 4664650

Browse files
authored
Merge pull request #21 from rassulrakhimzhan/master
custom subscription headers support added
2 parents d9b70a1 + 8780629 commit 4664650

File tree

1 file changed

+42
-40
lines changed

1 file changed

+42
-40
lines changed

lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java

Lines changed: 42 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -147,46 +147,48 @@ public void disconnect() {
147147
mConnected = false;
148148
}
149149

150-
public Observable<StompMessage> topic(String destinationPath) {
151-
return Observable.<StompMessage>create(subscriber -> {
152-
153-
Set<Subscriber<? super StompMessage>> subscribersSet = mSubscribers.get(destinationPath);
154-
if (subscribersSet == null) {
155-
subscribersSet = new HashSet<>();
156-
mSubscribers.put(destinationPath, subscribersSet);
157-
subscribePath(destinationPath);
158-
}
159-
subscribersSet.add(subscriber);
160-
161-
}).doOnUnsubscribe(() -> {
162-
for (String dest : mSubscribers.keySet()) {
163-
Set<Subscriber<? super StompMessage>> set = mSubscribers.get(dest);
164-
for (Subscriber<? super StompMessage> subscriber : set) {
165-
if (subscriber.isUnsubscribed()) {
166-
set.remove(subscriber);
167-
if (set.size() < 1) {
168-
mSubscribers.remove(dest);
169-
unsubscribePath(dest);
170-
}
171-
}
172-
}
173-
}
174-
});
175-
}
176-
177-
private void subscribePath(String destinationPath) {
178-
if (destinationPath == null) return;
179-
String topicId = UUID.randomUUID().toString();
180-
Log.d(TAG, "Subscribe path: " + destinationPath + " id: " + topicId);
181-
182-
if (mTopics == null) mTopics = new HashMap<>();
183-
mTopics.put(destinationPath, topicId);
184-
send(new StompMessage(StompCommand.SUBSCRIBE,
185-
Arrays.asList(
186-
new StompHeader(StompHeader.ID, topicId),
187-
new StompHeader(StompHeader.DESTINATION, destinationPath),
188-
new StompHeader(StompHeader.ACK, DEFAULT_ACK)), null));
189-
}
150+
public Observable<StompMessage> topic(String destinationPath, List<StompHeader> headerList) {
151+
return Observable.<StompMessage>create(subscriber -> {
152+
Set<Subscriber<? super StompMessage>> subscribersSet = mSubscribers.get(destinationPath);
153+
if (subscribersSet == null) {
154+
subscribersSet = new HashSet<>();
155+
mSubscribers.put(destinationPath, subscribersSet);
156+
subscribePath(destinationPath, headerList);
157+
}
158+
subscribersSet.add(subscriber);
159+
160+
}).doOnUnsubscribe(() -> {
161+
for (String dest : mSubscribers.keySet()) {
162+
Set<Subscriber<? super StompMessage>> set = mSubscribers.get(dest);
163+
for (Subscriber<? super StompMessage> subscriber : set) {
164+
if (subscriber.isUnsubscribed()) {
165+
set.remove(subscriber);
166+
if (set.size() < 1) {
167+
mSubscribers.remove(dest);
168+
unsubscribePath(dest);
169+
}
170+
}
171+
}
172+
}
173+
});
174+
}
175+
176+
private void subscribePath(String destinationPath, List<StompHeader> headerList) {
177+
if (destinationPath == null) return;
178+
String topicId = UUID.randomUUID().toString();
179+
180+
if (mTopics == null) mTopics = new HashMap<>();
181+
mTopics.put(destinationPath, topicId);
182+
List<StompHeader> headers = new ArrayList<>();
183+
headers.add(new StompHeader(StompHeader.ID, topicId));
184+
headers.add(new StompHeader(StompHeader.DESTINATION, destinationPath));
185+
headers.add(new StompHeader(StompHeader.ACK, DEFAULT_ACK));
186+
for(StompHeader header : headerList){
187+
headers.add(header);
188+
}
189+
send(new StompMessage(StompCommand.SUBSCRIBE,
190+
headers, null));
191+
}
190192

191193

192194
private void unsubscribePath(String dest) {

0 commit comments

Comments
 (0)