Skip to content

Commit

Permalink
fix: resume client to server communication after web socket reconnect…
Browse files Browse the repository at this point in the history
…ion (#20283)

* fix: resume client to server communication after web socket reconnection

When a websocket PUSH connection is closed and re-established because
of a network failure, the RequestResponseTracker.hasActiveRequest is
not reset, prenvint the Flow client to send additional messages to
the server.
This change will reset the flag on reconnection. It also will track
unsent PUSH message over websocket, to retry the delivery once the
connection is re-established, preventing client resynchronization.
In addition, it sets a default value of 12 for the Atmospehere
maxWebsocketErrorRetries setting, to ensure that the Flow client will
attempt to reconnect with web socket transport several times, instead
of immediately downgrade to long-polling after first failed connection.

Fixes #20213

* upgrade to atmosphere javascript 4.0.1 with reconnection fixes

---------

Co-authored-by: Mikhail Shabarov <61410877+mshabarov@users.noreply.github.com>
  • Loading branch information
mcollovati and mshabarov authored Nov 7, 2024
1 parent a1cbb0d commit b72e395
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,7 @@ protected final native AtmosphereConfiguration createConfig()
fallbackTransport: 'long-polling',
contentType: 'application/json; charset=UTF-8',
reconnectInterval: 5000,
maxWebsocketErrorRetries: 12,
timeout: -1,
maxReconnectOnClose: 10000000,
trackMessageLength: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,18 @@ public void pushOk(PushConnection pushConnection) {
debug("pushOk()");
if (isReconnecting()) {
resolveTemporaryError(Type.PUSH);
if (registry.getRequestResponseTracker().hasActiveRequest()) {
debug("pushOk() Reset active request state when reconnecting PUSH because of a network error.");
endRequest();
// for bidirectional transport, the pending message is not sent
// as reconnection payload, so immediately push the pending
// changes on reconnect
if (pushConnection.isBidirectional()) {
Console.debug(
"Flush pending messages after PUSH reconnection.");
registry.getMessageSender().sendInvocationsToServer();
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public enum ResynchronizationState {

private ResynchronizationState resynchronizationState = ResynchronizationState.NOT_ACTIVE;

private JsonObject pushPendingMessage;

/**
* Creates a new instance connected to the given registry.
*
Expand Down Expand Up @@ -104,6 +106,17 @@ public void sendInvocationsToServer() {
*
*/
private void doSendInvocationsToServer() {
// If there's a stored message, resend it and postpone processing the
// rest of the queued messages to prevent resynchronization issues.
if (pushPendingMessage != null) {
Console.log("Sending pending push message "
+ pushPendingMessage.toJson());
JsonObject payload = pushPendingMessage;
pushPendingMessage = null;
registry.getRequestResponseTracker().startRequest();
send(payload);
return;
}

ServerRpcQueue serverRpcQueue = registry.getServerRpcQueue();
if (serverRpcQueue.isEmpty()
Expand Down Expand Up @@ -181,6 +194,13 @@ private JsonObject preparePayload(final JsonArray reqInvocations,
*/
public void send(final JsonObject payload) {
if (push != null && push.isBidirectional()) {
// When using bidirectional transport, the payload is not resent
// to the server during reconnection attempts.
// Keep a copy of the message, so that it could be resent to the
// server after a reconnection.
// Reference will be cleaned up once the server confirms it has
// seen this message
pushPendingMessage = payload;
push.push(payload);
} else {
registry.getXhrConnection().send(payload);
Expand Down Expand Up @@ -260,7 +280,14 @@ public void resynchronize() {
*/
public void setClientToServerMessageId(int nextExpectedId, boolean force) {
if (nextExpectedId == clientToServerMessageId) {
// No op as everything matches they way it should
// Everything matches they way it should
// Remove potential pending PUSH message if it has already been seen
// by the server.
if (pushPendingMessage != null
&& (int) pushPendingMessage.getNumber(
ApplicationConstants.CLIENT_TO_SERVER_ID) < nextExpectedId) {
pushPendingMessage = null;
}
return;
}
if (force) {
Expand Down

0 comments on commit b72e395

Please sign in to comment.