Skip to content

Commit ca90e59

Browse files
committed
fix(race): concurrent next calls with defer/stream (#2975)
* fix(race): concurrent next calls * refactor test * use invariant * disable eslint error * fix
1 parent 3bdb787 commit ca90e59

File tree

2 files changed

+72
-0
lines changed

2 files changed

+72
-0
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,24 @@ async function complete(document: DocumentNode, rootValue: unknown = {}) {
7777
return result;
7878
}
7979

80+
async function completeAsync(
81+
document: DocumentNode,
82+
numCalls: number,
83+
rootValue: unknown = {},
84+
) {
85+
const result = await execute({ schema, document, rootValue });
86+
87+
assert(isAsyncIterable(result));
88+
89+
const iterator = result[Symbol.asyncIterator]();
90+
91+
const promises = [];
92+
for (let i = 0; i < numCalls; i++) {
93+
promises.push(iterator.next());
94+
}
95+
return Promise.all(promises);
96+
}
97+
8098
function createResolvablePromise<T>(): [Promise<T>, (value?: T) => void] {
8199
let resolveFn;
82100
const promise = new Promise<T>((resolve) => {
@@ -566,6 +584,51 @@ describe('Execute: stream directive', () => {
566584
},
567585
});
568586
});
587+
it('Can handle concurrent calls to .next() without waiting', async () => {
588+
const document = parse(`
589+
query {
590+
friendList @stream(initialCount: 2) {
591+
name
592+
id
593+
}
594+
}
595+
`);
596+
const result = await completeAsync(document, 4, {
597+
async *friendList() {
598+
yield await Promise.resolve(friends[0]);
599+
yield await Promise.resolve(friends[1]);
600+
yield await Promise.resolve(friends[2]);
601+
},
602+
});
603+
expectJSON(result).toDeepEqual([
604+
{
605+
done: false,
606+
value: {
607+
data: {
608+
friendList: [
609+
{ name: 'Luke', id: '1' },
610+
{ name: 'Han', id: '2' },
611+
],
612+
},
613+
hasNext: true,
614+
},
615+
},
616+
{
617+
done: false,
618+
value: {
619+
incremental: [
620+
{
621+
items: [{ name: 'Leia', id: '3' }],
622+
path: ['friendList', 2],
623+
},
624+
],
625+
hasNext: true,
626+
},
627+
},
628+
{ done: false, value: { hasNext: false } },
629+
{ done: true, value: undefined },
630+
]);
631+
});
569632
it('Handles error thrown in async iterable before initialCount is reached', async () => {
570633
const document = parse(`
571634
query {

src/execution/execute.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1823,7 +1823,16 @@ function yieldSubsequentPayloads(
18231823
},
18241824
);
18251825

1826+
if (exeContext.subsequentPayloads.length === 0) {
1827+
// a different call to next has exhausted all payloads
1828+
return { value: undefined, done: true };
1829+
}
18261830
const index = exeContext.subsequentPayloads.indexOf(asyncPayloadRecord);
1831+
if (index === -1) {
1832+
// a different call to next has consumed this payload
1833+
return race();
1834+
}
1835+
18271836
exeContext.subsequentPayloads.splice(index, 1);
18281837

18291838
const incrementalResult: IncrementalResult = {};

0 commit comments

Comments
 (0)