From 54dae34725a3a142ebdd945eaf0f0cb37998e61c Mon Sep 17 00:00:00 2001 From: Javier Date: Mon, 6 Jun 2022 16:47:51 +0200 Subject: [PATCH] fix: abort stale connection on reobserve (#9532) --- src/core/ObservableQuery.ts | 2 +- src/core/__tests__/QueryManager/index.ts | 114 +++++++++++++++++++++++ 2 files changed, 115 insertions(+), 1 deletion(-) diff --git a/src/core/ObservableQuery.ts b/src/core/ObservableQuery.ts index ea2a24d0eea..4e4f92846e0 100644 --- a/src/core/ObservableQuery.ts +++ b/src/core/ObservableQuery.ts @@ -836,7 +836,7 @@ Did you mean to call refetch(variables) instead of refetch({ variables })?`); // because we just want to ignore the old observable, not prematurely shut // it down, since other consumers may be awaiting this.concast.promise. if (this.concast && this.observer) { - this.concast.removeObserver(this.observer, true); + this.concast.removeObserver(this.observer); } this.concast = concast; diff --git a/src/core/__tests__/QueryManager/index.ts b/src/core/__tests__/QueryManager/index.ts index c6ed750bae7..4325f551818 100644 --- a/src/core/__tests__/QueryManager/index.ts +++ b/src/core/__tests__/QueryManager/index.ts @@ -553,6 +553,120 @@ describe('QueryManager', () => { }).then(resolve, reject); }); + itAsync('causes link unsubscription after reobserve', (resolve, reject) => { + const expResult = { + data: { + allPeople: { + people: [ + { + name: 'Luke Skywalker', + }, + ], + }, + }, + }; + + const request = { + query: gql` + query people ($offset: Int) { + allPeople(first: $offset) { + people { + name + } + } + } + `, + variables: undefined + }; + + const mockedResponse = { + request, + result: expResult + }; + + const onRequestSubscribe = jest.fn(); + const onRequestUnsubscribe = jest.fn(); + + const mockedSingleLink = new ApolloLink(() => { + return new Observable(observer => { + onRequestSubscribe(); + + // Delay (100ms) must be bigger than sum of reobserve and unsubscribe awaits (5ms each) + // to show clearly that the connection was aborted before finishing + const timer = setTimeout(() => { + observer.next(mockedResponse.result); + observer.complete(); + }, 100); + + return () => { + onRequestUnsubscribe(); + clearTimeout(timer); + }; + }); + }); + + const mockedQueryManger = new QueryManager({ + link: mockedSingleLink, + cache: new InMemoryCache({ addTypename: false }), + defaultOptions: { + watchQuery: { + fetchPolicy: 'cache-and-network', + returnPartialData: false, + partialRefetch: true, + notifyOnNetworkStatusChange: true + }, + query: { + fetchPolicy: 'network-only' + } + }, + queryDeduplication: false + }); + + const observableQuery = mockedQueryManger.watchQuery< + (typeof expResult)['data'], + { offset?: number | undefined } + >({ + query: request.query, + variables: request.variables + }); + + const observerCallback = wrap(reject, () => { + reject(new Error('Link subscription should have been cancelled')); + }); + + const subscription = observableQuery.subscribe({ + next: observerCallback, + error: observerCallback, + complete: observerCallback + }); + + expect(onRequestSubscribe).toHaveBeenCalledTimes(1); + + // This is the most important part of this test + // Check that reobserve cancels the previous connection while watchQuery remains active + observableQuery.reobserve({ variables: { offset: 20 } }); + + expect(onRequestSubscribe).toHaveBeenCalledTimes(2); + + return new Promise( + // Unsubscribing from the link requires around 5ms to take effect + resolve => setTimeout(resolve, 5) + ).then(() => { + // Verify that previous connection was aborted by reobserve + expect(onRequestUnsubscribe).toHaveBeenCalledTimes(1); + + subscription.unsubscribe(); + + return new Promise( + // Unsubscribing from the link requires around 5ms to take effect + resolve => setTimeout(resolve, 5) + ).then(() => { + // Verify that all connections were finally aborted (reobserve + unsubscribe) + expect(onRequestUnsubscribe).toHaveBeenCalledTimes(2); + }); + }).then(resolve, reject); + }); + itAsync('supports interoperability with other Observable implementations like RxJS', (resolve, reject) => { const expResult = { data: {