Skip to content

Commit 95f7052

Browse files
committed
Resolve race condition by ensuring that op=connected has been received
before sending a new subscribe event. Fixes #46
1 parent b376ee1 commit 95f7052

File tree

2 files changed

+18
-14
lines changed

2 files changed

+18
-14
lines changed

ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClientImpl.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
private WebSocketClient webSocketClient;
3939
private int requestIdCount = 1;
4040
private boolean userInitiatedDisconnect = false;
41+
private boolean hasReceivedConnected = false;
4142

4243
/* package */ ParseLiveQueryClientImpl() {
4344
this(getDefaultUri());
@@ -89,7 +90,7 @@ public <T extends ParseObject> SubscriptionHandling<T> subscribe(ParseQuery<T> q
8990
subscriptions.append(requestId, subscription);
9091

9192
// TODO: differentiate between state=CONNECTED, vs received op=connected response
92-
if (inAnyState(WebSocketClient.State.CONNECTED)) {
93+
if (isConnected()) {
9394
sendSubscription(subscription);
9495
} else if (userInitiatedDisconnect) {
9596
Log.w(LOG_TAG, "Warning: The client was explicitly disconnected! You must explicitly call .reconnect() in order to process your subscriptions.");
@@ -100,6 +101,10 @@ public <T extends ParseObject> SubscriptionHandling<T> subscribe(ParseQuery<T> q
100101
return subscription;
101102
}
102103

104+
private boolean isConnected() {
105+
return hasReceivedConnected && inAnyState(WebSocketClient.State.CONNECTED);
106+
}
107+
103108
public void connectIfNeeded() {
104109
switch (getWebSocketState()) {
105110
case CONNECTED:
@@ -151,18 +156,21 @@ public void reconnect() {
151156
webSocketClient.close();
152157
}
153158

159+
userInitiatedDisconnect = false;
160+
hasReceivedConnected = false;
154161
webSocketClient = webSocketClientFactory.createInstance(webSocketClientCallback, uri);
155162
webSocketClient.open();
156-
userInitiatedDisconnect = false;
157163
}
158164

159165
@Override
160166
public void disconnect() {
161167
if (webSocketClient != null) {
162-
userInitiatedDisconnect = true;
163168
webSocketClient.close();
164169
webSocketClient = null;
165170
}
171+
172+
userInitiatedDisconnect = true;
173+
hasReceivedConnected = false;
166174
}
167175

168176
@Override
@@ -220,6 +228,7 @@ private void parseMessage(String message) throws LiveQueryException {
220228

221229
switch (rawOperation) {
222230
case "connected":
231+
hasReceivedConnected = true;
223232
dispatchConnected();
224233
Log.v(LOG_TAG, "Connected, sending pending subscription");
225234
for (int i = 0; i < subscriptions.size(); i++) {
@@ -371,6 +380,7 @@ private WebSocketClient.WebSocketClientCallback getWebSocketClientCallback() {
371380
return new WebSocketClient.WebSocketClientCallback() {
372381
@Override
373382
public void onOpen() {
383+
hasReceivedConnected = false;
374384
Log.v(LOG_TAG, "Socket opened");
375385
ParseUser.getCurrentSessionTokenAsync().onSuccessTask(new Continuation<String, Task<Void>>() {
376386
@Override
@@ -406,12 +416,14 @@ public Void then(Task<Void> task) {
406416
@Override
407417
public void onClose() {
408418
Log.v(LOG_TAG, "Socket onClose");
419+
hasReceivedConnected = false;
409420
dispatchDisconnected();
410421
}
411422

412423
@Override
413424
public void onError(Throwable exception) {
414425
Log.e(LOG_TAG, "Socket onError", exception);
426+
hasReceivedConnected = false;
415427
dispatchSocketError(exception);
416428
}
417429

ParseLiveQuery/src/test/java/com/parse/TestParseLiveQueryClient.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ public void tearDown() throws Exception {
8484
}
8585

8686
@Test
87-
public void testSubscribeBetweenSocketConnectAndConnectResponse() throws Exception {
87+
public void testSubscribeAfterSocketConnectBeforeConnectedOp() throws Exception {
88+
// Bug: https://github.com/parse-community/ParseLiveQuery-Android/issues/46
8889
ParseQuery<ParseObject> queryA = ParseQuery.getQuery("objA");
8990
ParseQuery<ParseObject> queryB = ParseQuery.getQuery("objB");
9091
clearConnection();
@@ -102,21 +103,12 @@ public void testSubscribeBetweenSocketConnectAndConnectResponse() throws Excepti
102103
verify(webSocketClient, times(1)).send(contains("\"op\":\"connect\""));
103104

104105
// Now if we subscribe to queryB, we SHOULD NOT send the subscribe yet, until we get op=connected
105-
SubscriptionHandling<ParseObject> subB = parseLiveQueryClient.subscribe(queryB); // TODO: fix this state
106+
SubscriptionHandling<ParseObject> subB = parseLiveQueryClient.subscribe(queryB);
106107
verify(webSocketClient, never()).send(contains("\"op\":\"subscribe\""));
107108

108109
// on op=connected, _then_ we should send both subscriptions
109110
webSocketClientCallback.onMessage(createConnectedMessage().toString());
110111
verify(webSocketClient, times(2)).send(contains("\"op\":\"subscribe\""));
111-
112-
// 1. Subscribe to queryA
113-
// - it is not connected yet, so it will trigger reconnect.
114-
// 2. Socket opens & connects; initiate op=connect
115-
// 3. subscribe to queryB
116-
// - SOCKET is connected, but we haven't received op=connected yet.
117-
// - BUG: it will call sendSubscription now
118-
// 4. Server responds to #2 with op=connected
119-
// 5. On op=connected, we replay pending subscriptions, including the one that was already sent in #3
120112
}
121113

122114
@Test

0 commit comments

Comments
 (0)