Skip to content

Commit

Permalink
fix: abort stale connection on reobserve (#9532)
Browse files Browse the repository at this point in the history
  • Loading branch information
Javier committed Jun 7, 2022
1 parent 538b795 commit 54dae34
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/core/ObservableQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ Did you mean to call refetch(variables) instead of refetch({ variables })?`);
// because we just want to ignore the old observable, not prematurely shut
// it down, since other consumers may be awaiting this.concast.promise.
if (this.concast && this.observer) {
this.concast.removeObserver(this.observer, true);
this.concast.removeObserver(this.observer);
}

this.concast = concast;
Expand Down
114 changes: 114 additions & 0 deletions src/core/__tests__/QueryManager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,120 @@ describe('QueryManager', () => {
}).then(resolve, reject);
});

itAsync('causes link unsubscription after reobserve', (resolve, reject) => {
const expResult = {
data: {
allPeople: {
people: [
{
name: 'Luke Skywalker',
},
],
},
},
};

const request = {
query: gql`
query people ($offset: Int) {
allPeople(first: $offset) {
people {
name
}
}
}
`,
variables: undefined
};

const mockedResponse = {
request,
result: expResult
};

const onRequestSubscribe = jest.fn();
const onRequestUnsubscribe = jest.fn();

const mockedSingleLink = new ApolloLink(() => {
return new Observable(observer => {
onRequestSubscribe();

// Delay (100ms) must be bigger than sum of reobserve and unsubscribe awaits (5ms each)
// to show clearly that the connection was aborted before finishing
const timer = setTimeout(() => {
observer.next(mockedResponse.result);
observer.complete();
}, 100);

return () => {
onRequestUnsubscribe();
clearTimeout(timer);
};
});
});

const mockedQueryManger = new QueryManager({
link: mockedSingleLink,
cache: new InMemoryCache({ addTypename: false }),
defaultOptions: {
watchQuery: {
fetchPolicy: 'cache-and-network',
returnPartialData: false,
partialRefetch: true,
notifyOnNetworkStatusChange: true
},
query: {
fetchPolicy: 'network-only'
}
},
queryDeduplication: false
});

const observableQuery = mockedQueryManger.watchQuery<
(typeof expResult)['data'],
{ offset?: number | undefined }
>({
query: request.query,
variables: request.variables
});

const observerCallback = wrap(reject, () => {
reject(new Error('Link subscription should have been cancelled'));
});

const subscription = observableQuery.subscribe({
next: observerCallback,
error: observerCallback,
complete: observerCallback
});

expect(onRequestSubscribe).toHaveBeenCalledTimes(1);

// This is the most important part of this test
// Check that reobserve cancels the previous connection while watchQuery remains active
observableQuery.reobserve({ variables: { offset: 20 } });

expect(onRequestSubscribe).toHaveBeenCalledTimes(2);

return new Promise(
// Unsubscribing from the link requires around 5ms to take effect
resolve => setTimeout(resolve, 5)
).then(() => {
// Verify that previous connection was aborted by reobserve
expect(onRequestUnsubscribe).toHaveBeenCalledTimes(1);

subscription.unsubscribe();

return new Promise(
// Unsubscribing from the link requires around 5ms to take effect
resolve => setTimeout(resolve, 5)
).then(() => {
// Verify that all connections were finally aborted (reobserve + unsubscribe)
expect(onRequestUnsubscribe).toHaveBeenCalledTimes(2);
});
}).then(resolve, reject);
});

itAsync('supports interoperability with other Observable implementations like RxJS', (resolve, reject) => {
const expResult = {
data: {
Expand Down

0 comments on commit 54dae34

Please sign in to comment.