Skip to content

Commit 164e1ee

Browse files
authored
Merge pull request #70 from powersync-ja/fix-websocket-closing-error
Fix `WebSocket is not open: readyState 2 (CLOSING)`
2 parents a3f296c + 0d4432d commit 164e1ee

File tree

3 files changed

+111
-6
lines changed

3 files changed

+111
-6
lines changed

.changeset/tricky-parents-mate.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/service-rsocket-router': patch
3+
---
4+
5+
Fix WebSocket is not open: readyState 2 (CLOSING)

packages/rsocket-router/src/router/transport/WebsocketDuplexConnection.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,11 @@ export class WebsocketDuplexConnection extends Deferred implements DuplexConnect
6262
return;
6363
}
6464

65-
this.websocketDuplex.removeAllListeners();
65+
// Important: do not remove the error handler here.
66+
// That causes uncaught error handlers in some edge cases.
67+
this.websocketDuplex.removeAllListeners('close');
68+
this.websocketDuplex.removeAllListeners('data');
69+
6670
this.websocketDuplex.end();
6771

6872
super.close(error);
@@ -88,13 +92,15 @@ export class WebsocketDuplexConnection extends Deferred implements DuplexConnect
8892
}
8993
}
9094

91-
private handleClosed = (e: WebSocket.CloseEvent): void => {
92-
this.close(new Error(e.reason || 'WebsocketDuplexConnection: Socket closed unexpectedly.'));
95+
private handleClosed = (e?: WebSocket.CloseEvent): void => {
96+
this.close(new Error(e?.reason || 'WebsocketDuplexConnection: Socket closed unexpectedly.'));
9397
};
9498

9599
private handleError = (e: WebSocket.ErrorEvent): void => {
96100
logger.error(`Error in WebSocket duplex connection: ${e}`);
97-
this.close(e.error);
101+
if (!this.done) {
102+
this.close(e.error);
103+
}
98104
};
99105

100106
private handleMessage = (buffer: Buffer): void => {

packages/rsocket-router/tests/src/socket.test.ts

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,21 @@ import { WebsocketServerTransport } from '../../src/router/transport/WebSocketSe
77
import { WebsocketDuplexConnection } from '../../src/router/transport/WebsocketDuplexConnection.js';
88
import { Duplex } from 'stream';
99

10-
const WS_PORT = process.env.WS_PORT ? parseInt(process.env.WS_PORT) : 4532;
11-
const WS_ADDRESS = `ws://localhost:${WS_PORT}`;
10+
let nextPort = 5433;
1211

1312
describe('Sockets', () => {
1413
let server: WebSocket.WebSocketServer;
1514
let closeServer: () => void;
1615

16+
let WS_PORT = 0;
17+
let WS_ADDRESS = '';
18+
1719
beforeEach(() => {
1820
let closed = false;
21+
22+
WS_PORT = process.env.WS_PORT ? parseInt(process.env.WS_PORT) : nextPort++;
23+
WS_ADDRESS = `ws://localhost:${WS_PORT}`;
24+
1925
server = new WebSocket.WebSocketServer({
2026
port: WS_PORT
2127
});
@@ -198,4 +204,92 @@ describe('Sockets', () => {
198204
await Promise.all(promises);
199205
await vi.waitFor(() => expect(serverCancelSpy.mock.calls.length).equals(testCount), { timeout: 2000 });
200206
});
207+
208+
/**
209+
* Similar to the above test, but checks for the case where
210+
* the server closes the connection due to a keepalive timeout.
211+
*/
212+
it('should handle closed server connections correctly', async () => {
213+
const transport = new WebsocketServerTransport({
214+
wsCreator: () => server
215+
});
216+
217+
const onCancelWrapper = (callback: () => void) => callback();
218+
const serverCancelSpy = vi.fn(onCancelWrapper);
219+
220+
// Create a simple server which will spam a lot of data to any connection
221+
const rSocketServer = new RSocketServer({
222+
transport,
223+
acceptor: {
224+
accept: async () => {
225+
return {
226+
requestStream: (payload, initialN, responder) => {
227+
let stop = false;
228+
229+
setImmediate(async () => {
230+
while (!stop) {
231+
// To trigger the issue, we need to send multiple individual large messages.
232+
// This builds up a buffer that will be sent after closing the connection.
233+
for (let i = 0; i < 5; i++) {
234+
responder.onNext({ data: Buffer.from('some payload'.repeat(100_000)) }, false);
235+
}
236+
await new Promise((r) => setTimeout(r, 1));
237+
}
238+
});
239+
return {
240+
request: () => {},
241+
onExtension: () => {},
242+
cancel: () => {
243+
serverCancelSpy(() => {
244+
stop = true;
245+
});
246+
}
247+
};
248+
}
249+
};
250+
}
251+
}
252+
});
253+
rSocketServer.bind();
254+
255+
// Try and connect 10 times. Without the fix,
256+
// more than 50% of these should fail.
257+
// The socket will be closed by the server
258+
const testCount = 10;
259+
const promises = new Array(testCount).fill(null).map(async () => {
260+
const testSocket = new WebSocket.WebSocket(WS_ADDRESS);
261+
262+
const connector = new RSocketConnector({
263+
transport: new WebsocketClientTransport({
264+
url: WS_ADDRESS,
265+
wsCreator: (url) => testSocket as any
266+
}),
267+
268+
setup: {
269+
dataMimeType: 'application/bson',
270+
metadataMimeType: 'application/bson',
271+
272+
keepAlive: 20000,
273+
// This should be long enough to trigger after the initial setup
274+
lifetime: 20,
275+
276+
payload: {
277+
data: null
278+
}
279+
}
280+
});
281+
282+
const connection = await connector.connect();
283+
284+
connection.requestStream({ data: null }, 1, {
285+
onNext() {},
286+
onComplete: () => {},
287+
onExtension: () => {},
288+
onError: () => {}
289+
});
290+
});
291+
292+
await Promise.all(promises);
293+
await vi.waitFor(() => expect(serverCancelSpy.mock.calls.length).equals(testCount), { timeout: 2000 });
294+
});
201295
});

0 commit comments

Comments
 (0)