Skip to content
This repository was archived by the owner on Sep 25, 2023. It is now read-only.

Process missing messages during reconnection #199

Open
wants to merge 4 commits into
base: 4.2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/sdk/base/src/main/java/owt/base/Const.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ public class Const {
"'unifiedPlan': true," +
"'streamRemovable': true}" +
"}";
public static final String PROTOCOL_VERSION = "1.0";
public static final String PROTOCOL_VERSION = "1.0.1";
}
///@endcond
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.json.JSONException;
import org.json.JSONObject;
import org.json.JSONArray;

import java.net.URISyntaxException;
import java.util.ArrayList;
Expand Down Expand Up @@ -62,6 +63,7 @@ interface SignalingChannelObserver {
private final int MAX_RECONNECT_ATTEMPTS = 5;
private String reconnectionTicket;
private int reconnectAttempts = 0;
private int messageSequence = 0;
// No lock is guarding loggedIn so void access and modify it on threads other than
// |callbackExecutor|.
private boolean loggedIn = false;
Expand Down Expand Up @@ -106,10 +108,19 @@ interface SignalingChannelObserver {
private final Listener disconnectCallback = args -> callbackExecutor.execute(
this::triggerDisconnected);

// Count internal message sequence
private void incrementMessageSequence() {
if (messageSequence == Integer.MAX_VALUE) {
messageSequence = 0;
} else {
messageSequence++;
}
}
// MCU events.
private final Listener progressCallback = (Object... args) -> callbackExecutor.execute(() -> {
JSONObject msg = (JSONObject) args[0];
observer.onProgressMessage(msg);
incrementMessageSequence();
});
private final Listener participantCallback = (Object... args) -> callbackExecutor.execute(
() -> {
Expand All @@ -128,6 +139,7 @@ interface SignalingChannelObserver {
} catch (JSONException e) {
DCHECK(e);
}
incrementMessageSequence();
});
private final Listener streamCallback = (Object... args) -> callbackExecutor.execute(() -> {
try {
Expand All @@ -153,6 +165,7 @@ interface SignalingChannelObserver {
} catch (JSONException e) {
DCHECK(e);
}
incrementMessageSequence();
});
private final Listener textCallback = (Object... args) -> callbackExecutor.execute(() -> {
JSONObject data = (JSONObject) args[0];
Expand All @@ -162,6 +175,7 @@ interface SignalingChannelObserver {
} catch (JSONException e) {
DCHECK(false);
}
incrementMessageSequence();
});
private final Listener dropCallback = args -> triggerDisconnected();

Expand Down Expand Up @@ -267,7 +281,41 @@ private void relogin() {
DCHECK(reconnectionTicket);
socketClient.emit("relogin", reconnectionTicket, (Ack) (Object... args) -> {
if (extractMsg(0, args).equals("ok")) {
reconnectionTicket = (String) args[1];
if (args[1] instanceof JSONObject) {
try {
reconnectionTicket = ((JSONObject) args[1]).getString("ticket");
JSONArray pendingMessages = ((JSONObject) args[1]).getJSONArray("messages");
boolean isMissingStart = false;
for (int i = 0; i < pendingMessages.length(); i++) {
JSONObject message = pendingMessages.getJSONObject(i);
if (isMissingStart) {
Object messageData = message.get("data");
switch (message.getString("event")) {
case "participant":
participantCallback.call(messageData);
break;
case "text":
textCallback.call(messageData);
break;
case "stream":
streamCallback.call(messageData);
break;
case "progress":
progressCallback.call(messageData);
break;
default:
DCHECK(false);
}
} else if (message.getInt("seq") == messageSequence) {
isMissingStart = true;
}
}
} catch (JSONException e) {
DCHECK(e);
}
} else {
reconnectionTicket = (String) args[1];
}
reconnectAttempts = 0;
flushCachedMsg();
} else {
Expand Down