From 71b4d17efa197f6f0bb94105809f32a9adc86ea6 Mon Sep 17 00:00:00 2001 From: Diamond Lewis Date: Mon, 24 Jun 2024 15:27:33 -0500 Subject: [PATCH] fix: `LiveQueryClient.resubscribe` with Parse Server 7 causes many open connections (#2184) --- integration/test/ParseLiveQueryTest.js | 29 ++++++++++++++++++++++++++ integration/test/helper.js | 1 - src/LiveQueryClient.ts | 12 +++++------ src/__tests__/LiveQueryClient-test.js | 8 +++++++ 4 files changed, 43 insertions(+), 7 deletions(-) diff --git a/integration/test/ParseLiveQueryTest.js b/integration/test/ParseLiveQueryTest.js index f7231b551..b81441579 100644 --- a/integration/test/ParseLiveQueryTest.js +++ b/integration/test/ParseLiveQueryTest.js @@ -90,6 +90,35 @@ describe('Parse LiveQuery', () => { await promise; }); + it('can resubscribe', async () => { + const client = new Parse.LiveQueryClient({ + applicationId: 'integration', + serverURL: 'ws://localhost:1337', + javascriptKey: null, + masterKey: null, + sessionToken: null, + }); + client.open(); + const resubscribeSpy = spyOn(client, 'resubscribe').and.callThrough(); + const subscribeRequest = { + op: 'subscribe', + requestId: 1, + query: { + className: 'TestObject', + where: { objectId: 'HEXkuHFm0D' }, + keys: ['foo', 'objectId'], + watch: undefined, + unknownField: 'throws Additional properties not allowed error', + }, + sessionToken: undefined, + }; + await client.connectPromise; + client.socket.send(JSON.stringify(subscribeRequest)); + await sleep(1000); + expect(resubscribeSpy).toHaveBeenCalled(); + await client.close(); + }); + it('can subscribe to multiple queries', async () => { const objectA = new TestObject(); const objectB = new TestObject(); diff --git a/integration/test/helper.js b/integration/test/helper.js index c60b173a0..acd11093a 100644 --- a/integration/test/helper.js +++ b/integration/test/helper.js @@ -5,7 +5,6 @@ jasmine.getEnv().addReporter(new SpecReporter()); const ParseServer = require('parse-server').default; const CustomAuth = require('./CustomAuth'); -const sleep = require('./sleep'); const { TestUtils } = require('parse-server'); const Parse = require('../../node'); const fs = require('fs'); diff --git a/src/LiveQueryClient.ts b/src/LiveQueryClient.ts index b497fb4e9..512d5ac6e 100644 --- a/src/LiveQueryClient.ts +++ b/src/LiveQueryClient.ts @@ -55,6 +55,7 @@ const SUBSCRIPTION_EMMITER_TYPES = { DELETE: 'delete', }; +// Exponentially-growing random delay const generateInterval = k => { return Math.random() * Math.min(30, Math.pow(2, k) - 1) * 1000; }; @@ -291,7 +292,8 @@ class LiveQueryClient { const query = subscription.query; const queryJSON = query.toJSON(); const where = queryJSON.where; - const fields = queryJSON.keys ? queryJSON.keys.split(',') : undefined; + const keys = queryJSON.keys?.split(','); + const watch = queryJSON.watch?.split(','); const className = query.className; const sessionToken = subscription.sessionToken; const subscribeRequest = { @@ -300,7 +302,8 @@ class LiveQueryClient { query: { className, where, - fields, + keys, + watch, }, sessionToken: undefined as string | undefined, }; @@ -347,7 +350,6 @@ class LiveQueryClient { } _handleWebSocketOpen() { - this.attempts = 1; const connectRequest = { op: OP_TYPES.CONNECT, applicationId: this.applicationId, @@ -387,6 +389,7 @@ class LiveQueryClient { break; case OP_EVENTS.SUBSCRIBED: if (subscription) { + this.attempts = 1; subscription.subscribed = true; subscription.subscribePromise.resolve(); setTimeout(() => subscription.emit(SUBSCRIPTION_EMMITER_TYPES.OPEN, response), 200); @@ -485,7 +488,6 @@ class LiveQueryClient { if (this.state === CLIENT_STATE.DISCONNECTED) { return; } - this.state = CLIENT_STATE.RECONNECTING; const time = generateInterval(this.attempts); @@ -493,11 +495,9 @@ class LiveQueryClient { // we're unable to distinguish different between close/error when we're unable to reconnect therefore // we try to reconnect in both cases // server side ws and browser WebSocket behave differently in when close/error get triggered - if (this.reconnectHandle) { clearTimeout(this.reconnectHandle); } - this.reconnectHandle = setTimeout( (() => { this.attempts++; diff --git a/src/__tests__/LiveQueryClient-test.js b/src/__tests__/LiveQueryClient-test.js index 4588cc1cc..be9c90fae 100644 --- a/src/__tests__/LiveQueryClient-test.js +++ b/src/__tests__/LiveQueryClient-test.js @@ -944,6 +944,8 @@ describe('LiveQueryClient', () => { }; const query = new ParseQuery('Test'); query.equalTo('key', 'value'); + query.select(['key']); + query.watch(['key']); liveQueryClient.subscribe(query); liveQueryClient.connectPromise.resolve(); @@ -961,6 +963,8 @@ describe('LiveQueryClient', () => { where: { key: 'value', }, + keys: ['key'], + watch: ['key'], }, }); }); @@ -978,6 +982,8 @@ describe('LiveQueryClient', () => { }; const query = new ParseQuery('Test'); query.equalTo('key', 'value'); + query.select(['key']); + query.watch(['key']); liveQueryClient.subscribe(query, 'mySessionToken'); liveQueryClient.connectPromise.resolve(); @@ -996,6 +1002,8 @@ describe('LiveQueryClient', () => { where: { key: 'value', }, + keys: ['key'], + watch: ['key'], }, }); });