Skip to content

Commit 0c75dbc

Browse files
committed
[ECO-5514] fix: improve WebSocket transport lifecycle and activity management
* Force-cancel hanging socket connections * Replace synchronized blocks with fine-grained locking using `activityTimerMonitor` to avoid deadlocks. * Refactor `close()` for safer access to shared fields and handle uninitialized cases gracefully. * Simplify activity timer logic and ensure consistent disposal of `WebSocketClient` and `WebSocketHandler`.
1 parent 3f9d007 commit 0c75dbc

File tree

1 file changed

+75
-51
lines changed

1 file changed

+75
-51
lines changed

lib/src/main/java/io/ably/lib/transport/WebSocketTransport.java

Lines changed: 75 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class WebSocketTransport implements ITransport {
5252
private ConnectListener connectListener;
5353
private WebSocketClient webSocketClient;
5454
private final WebSocketEngine webSocketEngine;
55+
private WebSocketHandler webSocketHandler;
5556
private boolean activityCheckTurnedOff = false;
5657

5758
/******************
@@ -94,6 +95,10 @@ private static WebSocketEngine createWebSocketEngine(TransportParams params) {
9495
* ITransport methods
9596
******************/
9697

98+
/**
99+
* Connect is called once when we create transport;
100+
* after transport is closed, we never call `connect` again
101+
*/
97102
@Override
98103
public void connect(ConnectListener connectListener) {
99104
this.connectListener = connectListener;
@@ -107,9 +112,8 @@ public void connect(ConnectListener connectListener) {
107112
wsUri = HttpUtils.encodeParams(wsUri, connectParams);
108113

109114
Log.d(TAG, "connect(); wsUri = " + wsUri);
110-
synchronized (this) {
111-
webSocketClient = this.webSocketEngine.create(wsUri, new WebSocketHandler(this::receive));
112-
}
115+
webSocketHandler = new WebSocketHandler(this::receive);
116+
webSocketClient = this.webSocketEngine.create(wsUri, webSocketHandler);
113117
webSocketClient.connect();
114118
} catch (AblyException e) {
115119
Log.e(TAG, "Unexpected exception attempting connection; wsUri = " + wsUri, e);
@@ -123,11 +127,25 @@ public void connect(ConnectListener connectListener) {
123127
@Override
124128
public void close() {
125129
Log.d(TAG, "close()");
126-
synchronized (this) {
127-
if (webSocketClient != null) {
128-
webSocketClient.close();
129-
webSocketClient = null;
130-
}
130+
// Take local snapshots of the shared references. Callback threads (e.g., onClose)
131+
// may concurrently set these fields to null.
132+
//
133+
// Intentionally avoid synchronizing here:
134+
// - The WebSocket library may invoke our WebSocketHandler while holding its own
135+
// internal locks.
136+
// - If close() also acquired a lock on WebSocketTransport, we could invert the
137+
// lock order and create a circular wait (deadlock): close() waits for the WS
138+
// library to release its lock, while the WS library waits for a lock on
139+
// WebSocketTransport.
140+
final WebSocketClient client = webSocketClient;
141+
final WebSocketHandler handler = webSocketHandler;
142+
if (client != null && handler != null) {
143+
// Record activity so the activity timer remains armed. If a graceful close
144+
// stalls, the timer can detect inactivity and force-cancel the socket.
145+
handler.flagActivity();
146+
client.close();
147+
} else {
148+
Log.w(TAG, "close() called on uninitialized or already closed transport");
131149
}
132150
}
133151

@@ -217,10 +235,15 @@ class WebSocketHandler implements WebSocketListener {
217235
* WsClient private members
218236
***************************/
219237

220-
private Timer timer = new Timer();
238+
private final Timer timer = new Timer();
221239
private TimerTask activityTimerTask = null;
222240
private long lastActivityTime;
223241

242+
/**
243+
* Monitor for activity timer events
244+
*/
245+
private final Object activityTimerMonitor = new Object();
246+
224247
WebSocketHandler(WebSocketReceiver receiver) {
225248
this.receiver = receiver;
226249
}
@@ -305,6 +328,9 @@ public void onClose(final int wsCode, final String wsReason) {
305328
}
306329
connectListener.onTransportUnavailable(WebSocketTransport.this, reason);
307330
dispose();
331+
332+
webSocketClient = null;
333+
webSocketHandler = null;
308334
}
309335

310336
@Override
@@ -318,66 +344,64 @@ public void onOldJavaVersionDetected(Throwable throwable) {
318344
Log.w(TAG, "Error when trying to set SSL parameters, most likely due to an old Java API version", throwable);
319345
}
320346

321-
private synchronized void dispose() {
322-
/* dispose timer */
323-
try {
324-
timer.cancel();
325-
timer = null;
326-
} catch (IllegalStateException e) {
327-
}
347+
private void dispose() {
348+
timer.cancel();
328349
}
329350

330-
private synchronized void flagActivity() {
351+
private void flagActivity() {
331352
lastActivityTime = System.currentTimeMillis();
332353
connectionManager.setLastActivity(lastActivityTime);
333-
if (activityTimerTask == null && connectionManager.maxIdleInterval != 0 && !activityCheckTurnedOff) {
334-
/* No timer currently running because previously there was no
335-
* maxIdleInterval configured, but now there is a
336-
* maxIdleInterval configured. Call checkActivity so a timer
337-
* gets started. This happens when flagActivity gets called
338-
* just after processing the connect message that configures
339-
* maxIdleInterval. */
340-
checkActivity();
354+
355+
if (connectionManager.maxIdleInterval == 0) {
356+
Log.v(TAG, "checkActivity: turned off because maxIdleInterval is 0");
357+
return;
358+
}
359+
360+
if (activityCheckTurnedOff) {
361+
Log.v(TAG, "checkActivity: turned off for test purpose");
362+
return;
341363
}
364+
365+
checkActivity();
342366
}
343367

344-
private synchronized void checkActivity() {
368+
private void checkActivity() {
345369
long timeout = getActivityTimeout();
370+
346371
if (timeout == 0) {
347372
Log.v(TAG, "checkActivity: infinite timeout");
348373
return;
349374
}
350375

351-
// Check if timer already running
352-
if (activityTimerTask != null) {
353-
return;
376+
synchronized (activityTimerMonitor) {
377+
// Check if timer already running
378+
if (activityTimerTask == null) {
379+
// Start the activity timer task
380+
startActivityTimer(timeout + 100);
381+
}
354382
}
355-
356-
// Start the activity timer task
357-
startActivityTimer(timeout + 100);
358383
}
359384

360-
private synchronized void startActivityTimer(long timeout) {
361-
if (activityTimerTask == null) {
362-
schedule((activityTimerTask = new TimerTask() {
363-
public void run() {
364-
try {
365-
onActivityTimerExpiry();
366-
} catch (Throwable t) {
367-
Log.e(TAG, "Unexpected exception in activity timer handler", t);
368-
}
385+
private void startActivityTimer(long timeout) {
386+
activityTimerTask = new TimerTask() {
387+
public void run() {
388+
try {
389+
onActivityTimerExpiry();
390+
} catch (Throwable t) {
391+
Log.e(TAG, "Unexpected exception in activity timer handler", t);
392+
webSocketClient.cancel(ABNORMAL_CLOSE, "Activity timer closed unexpectedly");
369393
}
370-
}), timeout);
371-
}
394+
}
395+
};
396+
schedule(activityTimerTask, timeout);
372397
}
373398

374-
private synchronized void schedule(TimerTask task, long delay) {
375-
if (timer != null) {
376-
try {
377-
timer.schedule(task, delay);
378-
} catch (IllegalStateException ise) {
379-
Log.e(TAG, "Unexpected exception scheduling activity timer", ise);
380-
}
399+
private void schedule(TimerTask task, long delay) {
400+
try {
401+
timer.schedule(task, delay);
402+
} catch (IllegalStateException ise) {
403+
Log.e(TAG, "Unexpected exception scheduling activity timer", ise);
404+
webSocketClient.cancel(ABNORMAL_CLOSE, "Activity timer closed unexpectedly");
381405
}
382406
}
383407

@@ -392,7 +416,7 @@ private void onActivityTimerExpiry() {
392416
return;
393417
}
394418

395-
synchronized (this) {
419+
synchronized (activityTimerMonitor) {
396420
activityTimerTask = null;
397421
// Otherwise, we've had some activity, restart the timer for the next timeout
398422
Log.v(TAG, "onActivityTimerExpiry: ok");

0 commit comments

Comments
 (0)