Skip to content

Resolve race condition by ensuring that op=connected has been received before sending a new subscribe event #48

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
private WebSocketClient webSocketClient;
private int requestIdCount = 1;
private boolean userInitiatedDisconnect = false;
private boolean hasReceivedConnected = false;

/* package */ ParseLiveQueryClientImpl() {
this(getDefaultUri());
Expand Down Expand Up @@ -88,7 +89,7 @@ public <T extends ParseObject> SubscriptionHandling<T> subscribe(ParseQuery<T> q
Subscription<T> subscription = new Subscription<>(requestId, query);
subscriptions.append(requestId, subscription);

if (inAnyState(WebSocketClient.State.CONNECTED)) {
if (isConnected()) {
sendSubscription(subscription);
} else if (userInitiatedDisconnect) {
Log.w(LOG_TAG, "Warning: The client was explicitly disconnected! You must explicitly call .reconnect() in order to process your subscriptions.");
Expand Down Expand Up @@ -150,18 +151,21 @@ public void reconnect() {
webSocketClient.close();
}

userInitiatedDisconnect = false;
hasReceivedConnected = false;
webSocketClient = webSocketClientFactory.createInstance(webSocketClientCallback, uri);
webSocketClient.open();
userInitiatedDisconnect = false;
}

@Override
public void disconnect() {
if (webSocketClient != null) {
userInitiatedDisconnect = true;
webSocketClient.close();
webSocketClient = null;
}

userInitiatedDisconnect = true;
hasReceivedConnected = false;
}

@Override
Expand All @@ -185,6 +189,10 @@ private WebSocketClient.State getWebSocketState() {
return state == null ? WebSocketClient.State.NONE : state;
}

private boolean isConnected() {
return hasReceivedConnected && inAnyState(WebSocketClient.State.CONNECTED);
}

private boolean inAnyState(WebSocketClient.State... states) {
return Arrays.asList(states).contains(getWebSocketState());
}
Expand Down Expand Up @@ -219,6 +227,7 @@ private void parseMessage(String message) throws LiveQueryException {

switch (rawOperation) {
case "connected":
hasReceivedConnected = true;
dispatchConnected();
Log.v(LOG_TAG, "Connected, sending pending subscription");
for (int i = 0; i < subscriptions.size(); i++) {
Expand Down Expand Up @@ -370,6 +379,7 @@ private WebSocketClient.WebSocketClientCallback getWebSocketClientCallback() {
return new WebSocketClient.WebSocketClientCallback() {
@Override
public void onOpen() {
hasReceivedConnected = false;
Log.v(LOG_TAG, "Socket opened");
ParseUser.getCurrentSessionTokenAsync().onSuccessTask(new Continuation<String, Task<Void>>() {
@Override
Expand Down Expand Up @@ -405,12 +415,14 @@ public Void then(Task<Void> task) {
@Override
public void onClose() {
Log.v(LOG_TAG, "Socket onClose");
hasReceivedConnected = false;
dispatchDisconnected();
}

@Override
public void onError(Throwable exception) {
Log.e(LOG_TAG, "Socket onError", exception);
hasReceivedConnected = false;
dispatchSocketError(exception);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.parse;

import org.assertj.core.api.Assertions;
import org.json.JSONException;
import org.json.JSONObject;
import org.junit.After;
Expand Down Expand Up @@ -82,6 +83,34 @@ public void tearDown() throws Exception {
ParsePlugins.reset();
}

@Test
public void testSubscribeAfterSocketConnectBeforeConnectedOp() throws Exception {
// Bug: https://github.com/parse-community/ParseLiveQuery-Android/issues/46
ParseQuery<ParseObject> queryA = ParseQuery.getQuery("objA");
ParseQuery<ParseObject> queryB = ParseQuery.getQuery("objB");
clearConnection();

// This will trigger connectIfNeeded(), which calls reconnect()
SubscriptionHandling<ParseObject> subA = parseLiveQueryClient.subscribe(queryA);

verify(webSocketClient, times(1)).open();
verify(webSocketClient, never()).send(anyString());

// Now the socket is open
webSocketClientCallback.onOpen();
when(webSocketClient.getState()).thenReturn(WebSocketClient.State.CONNECTED);
// and we send op=connect
verify(webSocketClient, times(1)).send(contains("\"op\":\"connect\""));

// Now if we subscribe to queryB, we SHOULD NOT send the subscribe yet, until we get op=connected
SubscriptionHandling<ParseObject> subB = parseLiveQueryClient.subscribe(queryB);
verify(webSocketClient, never()).send(contains("\"op\":\"subscribe\""));

// on op=connected, _then_ we should send both subscriptions
webSocketClientCallback.onMessage(createConnectedMessage().toString());
verify(webSocketClient, times(2)).send(contains("\"op\":\"subscribe\""));
}

@Test
public void testSubscribeWhenSubscribedToCallback() throws Exception {
SubscriptionHandling.HandleSubscribeCallback<ParseObject> subscribeMockCallback = mock(SubscriptionHandling.HandleSubscribeCallback.class);
Expand Down Expand Up @@ -459,6 +488,11 @@ private void validateSameObject(SubscriptionHandling.HandleEventCallback<ParseOb
assertEquals(originalParseObject.getObjectId(), newParseObject.getObjectId());
}

private void clearConnection() {
webSocketClient = null;
webSocketClientCallback = null;
}

private void reconnect() {
parseLiveQueryClient.reconnect();
webSocketClientCallback.onOpen();
Expand Down