Skip to content

Commit

Permalink
fix: abort stale connection on reobserve (#9532)
Browse files Browse the repository at this point in the history
  • Loading branch information
Javier committed Jun 7, 2022
1 parent 538b795 commit 4881f03
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/core/ObservableQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ Did you mean to call refetch(variables) instead of refetch({ variables })?`);
// because we just want to ignore the old observable, not prematurely shut
// it down, since other consumers may be awaiting this.concast.promise.
if (this.concast && this.observer) {
this.concast.removeObserver(this.observer, true);
this.concast.removeObserver(this.observer);
}

this.concast = concast;
Expand Down
121 changes: 116 additions & 5 deletions src/core/__tests__/QueryManager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -507,10 +507,12 @@ describe('QueryManager', () => {
return new Observable(observer => {
onRequestSubscribe();

// Delay (100ms) must be bigger than unsubscribe await (5ms)
// to show clearly that the connection was aborted before completing
const timer = setTimeout(() => {
observer.next(mockedResponse.result);
observer.complete();
}, 0);
}, 100);

return () => {
onRequestUnsubscribe();
Expand Down Expand Up @@ -543,16 +545,125 @@ describe('QueryManager', () => {
subscription.unsubscribe();

return new Promise(
// Unsubscribing from the link happens after a microtask
// (Promise.resolve().then) delay, so we need to wait at least that
// long before verifying onRequestUnsubscribe was called.
resolve => setTimeout(resolve, 0)
// Unsubscribing from the link requires around 5ms to take effect
resolve => setTimeout(resolve, 5)
).then(() => {
expect(onRequestSubscribe).toHaveBeenCalledTimes(1);
expect(onRequestUnsubscribe).toHaveBeenCalledTimes(1);
}).then(resolve, reject);
});

itAsync('causes link unsubscription after reobserve', (resolve, reject) => {
const expResult = {
data: {
allPeople: {
people: [
{
name: 'Luke Skywalker',
},
],
},
},
};

const request = {
query: gql`
query people ($offset: Int) {
allPeople(first: $offset) {
people {
name
}
}
}
`,
variables: undefined
};

const mockedResponse = {
request,
result: expResult
};

const onRequestSubscribe = jest.fn();
const onRequestUnsubscribe = jest.fn();

const mockedSingleLink = new ApolloLink(() => {
return new Observable(observer => {
onRequestSubscribe();

// Delay (100ms) must be bigger than sum of reobserve and unsubscribe awaits (5ms each)
// to show clearly that the connection was aborted before completing
const timer = setTimeout(() => {
observer.next(mockedResponse.result);
observer.complete();
}, 100);

return () => {
onRequestUnsubscribe();
clearTimeout(timer);
};
});
});

const mockedQueryManger = new QueryManager({
link: mockedSingleLink,
cache: new InMemoryCache({ addTypename: false }),
defaultOptions: {
watchQuery: {
fetchPolicy: 'cache-and-network',
returnPartialData: false,
partialRefetch: true,
notifyOnNetworkStatusChange: true
},
query: {
fetchPolicy: 'network-only'
}
},
queryDeduplication: false
});

const observableQuery = mockedQueryManger.watchQuery<
(typeof expResult)['data'],
{ offset?: number | undefined }
>({
query: request.query,
variables: request.variables
});

const observerCallback = wrap(reject, () => {
reject(new Error('Link subscription should have been cancelled'));
});

const subscription = observableQuery.subscribe({
next: observerCallback,
error: observerCallback,
complete: observerCallback
});

expect(onRequestSubscribe).toHaveBeenCalledTimes(1);

// This is the most important part of this test
// Check that reobserve cancels the previous connection while watchQuery remains active
observableQuery.reobserve({ variables: { offset: 20 } });

return new Promise(
// Unsubscribing from the link requires around 5ms to take effect
resolve => setTimeout(resolve, 5)
).then(async () => {
// Verify that previous connection was aborted by reobserve
expect(onRequestUnsubscribe).toHaveBeenCalledTimes(1);

subscription.unsubscribe();

// Unsubscribing from the link requires around 5ms to take effect
await new Promise(resolve => setTimeout(resolve, 5))

expect(onRequestSubscribe).toHaveBeenCalledTimes(2);
// Verify that all connections were finally aborted (reobserve + unsubscribe)
expect(onRequestUnsubscribe).toHaveBeenCalledTimes(2);
}).then(resolve, reject);
});

itAsync('supports interoperability with other Observable implementations like RxJS', (resolve, reject) => {
const expResult = {
data: {
Expand Down

0 comments on commit 4881f03

Please sign in to comment.