From 967700bdbc94c74f75ba84d2b3f4b9f3fd2dca0b Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 8 Jun 2023 19:04:49 +1000 Subject: [PATCH] fix: LiveQuery server is not shut down properly when `handleShutdown` is called (#8491) --- spec/ParseLiveQuery.spec.js | 35 +++++++++++++++++++ .../Storage/Mongo/MongoStorageAdapter.js | 7 ++-- .../Postgres/PostgresStorageAdapter.js | 4 ++- src/LiveQuery/ParseLiveQueryServer.js | 15 ++++++++ src/ParseServer.js | 6 ++++ 5 files changed, 63 insertions(+), 4 deletions(-) diff --git a/spec/ParseLiveQuery.spec.js b/spec/ParseLiveQuery.spec.js index 959df18cf9..015725ac46 100644 --- a/spec/ParseLiveQuery.spec.js +++ b/spec/ParseLiveQuery.spec.js @@ -2,6 +2,7 @@ const Auth = require('../lib/Auth'); const UserController = require('../lib/Controllers/UserController').UserController; const Config = require('../lib/Config'); +const ParseServer = require('../lib/index').ParseServer; const triggers = require('../lib/triggers'); const validatorFail = () => { throw 'you are not authorized'; @@ -1214,6 +1215,40 @@ describe('ParseLiveQuery', function () { await object.save(); }); + it('does shutdown liveQuery server', async () => { + await reconfigureServer({ appId: 'test_app_id' }); + const config = { + appId: 'hello_test', + masterKey: 'world', + port: 1345, + mountPath: '/1', + serverURL: 'http://localhost:1345/1', + liveQuery: { + classNames: ['Yolo'], + }, + startLiveQueryServer: true, + }; + if (process.env.PARSE_SERVER_TEST_DB === 'postgres') { + config.databaseAdapter = new databaseAdapter.constructor({ + uri: databaseURI, + collectionPrefix: 'test_', + }); + config.filesAdapter = defaultConfiguration.filesAdapter; + } + const server = await ParseServer.startApp(config); + const client = await Parse.CoreManager.getLiveQueryController().getDefaultLiveQueryClient(); + client.serverURL = 'ws://localhost:1345/1'; + const query = await new Parse.Query('Yolo').subscribe(); + await Promise.all([ + server.handleShutdown(), + new Promise(resolve => query.on('close', resolve)), + ]); + await new Promise(resolve => setTimeout(resolve, 100)); + expect(server.liveQueryServer.server.address()).toBeNull(); + expect(server.liveQueryServer.subscriber.isOpen).toBeFalse(); + await new Promise(resolve => server.server.close(resolve)); + }); + it('prevent afterSave trigger if not exists', async () => { await reconfigureServer({ liveQuery: { diff --git a/src/Adapters/Storage/Mongo/MongoStorageAdapter.js b/src/Adapters/Storage/Mongo/MongoStorageAdapter.js index 78833a026b..2f59819895 100644 --- a/src/Adapters/Storage/Mongo/MongoStorageAdapter.js +++ b/src/Adapters/Storage/Mongo/MongoStorageAdapter.js @@ -212,11 +212,12 @@ export class MongoStorageAdapter implements StorageAdapter { throw error; } - handleShutdown() { + async handleShutdown() { if (!this.client) { - return Promise.resolve(); + return; } - return this.client.close(false); + await this.client.close(false); + delete this.connectionPromise; } _adaptiveCollection(name: string) { diff --git a/src/Adapters/Storage/Postgres/PostgresStorageAdapter.js b/src/Adapters/Storage/Postgres/PostgresStorageAdapter.js index 82ac0c20dc..3e8e867799 100644 --- a/src/Adapters/Storage/Postgres/PostgresStorageAdapter.js +++ b/src/Adapters/Storage/Postgres/PostgresStorageAdapter.js @@ -1194,7 +1194,9 @@ export class PostgresStorageAdapter implements StorageAdapter { const now = new Date().getTime(); const helpers = this._pgp.helpers; debug('deleteAllClasses'); - + if (this._client?.$pool.ended) { + return; + } await this._client .task('delete-all-classes', async t => { try { diff --git a/src/LiveQuery/ParseLiveQueryServer.js b/src/LiveQuery/ParseLiveQueryServer.js index 1105a2a6b7..d0b535f3a1 100644 --- a/src/LiveQuery/ParseLiveQueryServer.js +++ b/src/LiveQuery/ParseLiveQueryServer.js @@ -93,6 +93,21 @@ class ParseLiveQueryServer { } this._createSubscribers(); } + + async shutdown() { + if (this.subscriber.isOpen) { + await Promise.all([ + ...[...this.clients.values()].map(client => client.parseWebSocket.ws.close()), + this.parseWebSocketServer.close(), + ...Array.from(this.subscriber.subscriptions.keys()).map(key => + this.subscriber.unsubscribe(key) + ), + this.subscriber.close?.(), + ]); + } + this.subscriber.isOpen = false; + } + _createSubscribers() { const messageRecieved = (channel, messageStr) => { logger.verbose('Subscribe message %j', messageStr); diff --git a/src/ParseServer.js b/src/ParseServer.js index 04379ecfd3..192ad9c40c 100644 --- a/src/ParseServer.js +++ b/src/ParseServer.js @@ -168,6 +168,12 @@ class ParseServer { if (cacheAdapter && typeof cacheAdapter.handleShutdown === 'function') { promises.push(cacheAdapter.handleShutdown()); } + if (this.liveQueryServer?.server?.close) { + promises.push(new Promise(resolve => this.liveQueryServer.server.close(resolve))); + } + if (this.liveQueryServer) { + promises.push(this.liveQueryServer.shutdown()); + } return (promises.length > 0 ? Promise.all(promises) : Promise.resolve()).then(() => { if (this.config.serverCloseComplete) { this.config.serverCloseComplete();