Skip to content

Commit 37b7867

Browse files
committed
fix(race): concurrent next calls with defer/stream (#2975)
* fix(race): concurrent next calls * refactor test * use invariant * disable eslint error * fix
1 parent 33e23b3 commit 37b7867

File tree

2 files changed

+83
-0
lines changed

2 files changed

+83
-0
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,22 @@ async function complete(document: DocumentNode, rootValue: unknown = {}) {
244244
return result;
245245
}
246246

247+
async function completeAsync(document: DocumentNode, numCalls: number) {
248+
const schema = new GraphQLSchema({ query });
249+
250+
const result = await execute({ schema, document, rootValue: {} });
251+
252+
assert(isAsyncIterable(result));
253+
254+
const iterator = result[Symbol.asyncIterator]();
255+
256+
const promises = [];
257+
for (let i = 0; i < numCalls; i++) {
258+
promises.push(iterator.next());
259+
}
260+
return Promise.all(promises);
261+
}
262+
247263
describe('Execute: stream directive', () => {
248264
it('Can stream a list field', async () => {
249265
const document = parse('{ scalarList @stream(initialCount: 1) }');
@@ -739,6 +755,64 @@ describe('Execute: stream directive', () => {
739755
},
740756
});
741757
});
758+
it('Can handle concurrent calls to .next() without waiting', async () => {
759+
const document = parse(`
760+
query {
761+
asyncIterableList @stream(initialCount: 2) {
762+
name
763+
id
764+
}
765+
}
766+
`);
767+
const result = await completeAsync(document, 4);
768+
expectJSON(result).toDeepEqual([
769+
{
770+
done: false,
771+
value: {
772+
data: {
773+
asyncIterableList: [
774+
{
775+
name: 'Luke',
776+
id: '1',
777+
},
778+
{
779+
name: 'Han',
780+
id: '2',
781+
},
782+
],
783+
},
784+
hasNext: true,
785+
},
786+
},
787+
{
788+
done: false,
789+
value: {
790+
incremental: [
791+
{
792+
items: [
793+
{
794+
name: 'Leia',
795+
id: '3',
796+
},
797+
],
798+
path: ['asyncIterableList', 2],
799+
},
800+
],
801+
hasNext: true,
802+
},
803+
},
804+
{
805+
done: false,
806+
value: {
807+
hasNext: false,
808+
},
809+
},
810+
{
811+
done: true,
812+
value: undefined,
813+
},
814+
]);
815+
});
742816
it('Handles error thrown in async iterable before initialCount is reached', async () => {
743817
const document = parse(`
744818
query {

src/execution/execute.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1821,7 +1821,16 @@ function yieldSubsequentPayloads(
18211821
},
18221822
);
18231823

1824+
if (exeContext.subsequentPayloads.length === 0) {
1825+
// a different call to next has exhausted all payloads
1826+
return { value: undefined, done: true };
1827+
}
18241828
const index = exeContext.subsequentPayloads.indexOf(asyncPayloadRecord);
1829+
if (index === -1) {
1830+
// a different call to next has consumed this payload
1831+
return race();
1832+
}
1833+
18251834
exeContext.subsequentPayloads.splice(index, 1);
18261835

18271836
const incrementalResult: IncrementalResult = {};

0 commit comments

Comments
 (0)