Skip to content

Commit b63c09a

Browse files
committed
do not initiate multiple streams at the same path
1 parent cb6fac3 commit b63c09a

File tree

2 files changed

+141
-21
lines changed

2 files changed

+141
-21
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -678,6 +678,107 @@ describe('Execute: stream directive', () => {
678678
},
679679
]);
680680
});
681+
it('Does not initiate multiple streams at the same path', async () => {
682+
const document = parse(`
683+
query {
684+
friendList @stream(initialCount: 2) {
685+
name
686+
id
687+
}
688+
... @defer {
689+
friendList @stream(initialCount: 2) {
690+
name
691+
id
692+
}
693+
}
694+
}
695+
`);
696+
const result = await complete(document, { friendList: friends });
697+
expectJSON(result).toDeepEqual([
698+
{
699+
data: {
700+
friendList: [
701+
{ name: 'Luke', id: '1' },
702+
{ name: 'Han', id: '2' },
703+
],
704+
},
705+
hasNext: true,
706+
},
707+
{
708+
incremental: [
709+
{
710+
items: [{ name: 'Leia', id: '3' }],
711+
path: ['friendList', 2],
712+
},
713+
{
714+
data: {
715+
friendList: [
716+
{ name: 'Luke', id: '1' },
717+
{ name: 'Han', id: '2' },
718+
],
719+
},
720+
path: [],
721+
},
722+
],
723+
hasNext: false,
724+
},
725+
]);
726+
});
727+
it('Does not initiate multiple streams at the same path for async iterables', async () => {
728+
const document = parse(`
729+
query {
730+
friendList @stream(initialCount: 2) {
731+
name
732+
id
733+
}
734+
... @defer {
735+
friendList @stream(initialCount: 2) {
736+
name
737+
id
738+
}
739+
}
740+
}
741+
`);
742+
const result = await complete(document, {
743+
async *friendList() {
744+
yield await Promise.resolve(friends[0]);
745+
yield await Promise.resolve(friends[1]);
746+
yield await Promise.resolve(friends[2]);
747+
},
748+
});
749+
expectJSON(result).toDeepEqual([
750+
{
751+
data: {
752+
friendList: [
753+
{ name: 'Luke', id: '1' },
754+
{ name: 'Han', id: '2' },
755+
],
756+
},
757+
hasNext: true,
758+
},
759+
{
760+
incremental: [
761+
{
762+
data: {
763+
friendList: [
764+
{ name: 'Luke', id: '1' },
765+
{ name: 'Han', id: '2' },
766+
],
767+
},
768+
path: [],
769+
},
770+
{
771+
items: [{ name: 'Leia', id: '3' }],
772+
path: ['friendList', 2],
773+
},
774+
],
775+
hasNext: true,
776+
},
777+
{
778+
hasNext: false,
779+
},
780+
]);
781+
});
681782
it('Negative values of initialCount throw field errors on a field that returns an async iterable', async () => {
682783
const document = parse(`
683784
query {

src/execution/execute.ts

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ export interface ExecutionContext {
125125
errors: Array<GraphQLError>;
126126
subsequentPayloads: Set<AsyncPayloadRecord>;
127127
branches: WeakMap<GroupedFieldSet, Set<Path | undefined>>;
128+
streams: WeakSet<FieldGroup>;
128129
}
129130

130131
/**
@@ -505,6 +506,7 @@ export function buildExecutionContext(
505506
addPath: createPathFactory(),
506507
subsequentPayloads: new Set(),
507508
branches: new WeakMap(),
509+
streams: new WeakSet(),
508510
errors: [],
509511
};
510512
}
@@ -519,6 +521,7 @@ function buildPerEventExecutionContext(
519521
addPath: createPathFactory(),
520522
subsequentPayloads: new Set(),
521523
branches: new WeakMap(),
524+
streams: new WeakSet(),
522525
errors: [],
523526
};
524527
}
@@ -540,6 +543,18 @@ function shouldBranch(
540543
return false;
541544
}
542545

546+
function shouldStream(
547+
fieldGroup: FieldGroup,
548+
exeContext: ExecutionContext,
549+
): boolean {
550+
const hasStreamed = exeContext.streams.has(fieldGroup);
551+
if (hasStreamed) {
552+
return false;
553+
}
554+
exeContext.streams.add(fieldGroup);
555+
return true;
556+
}
557+
543558
/**
544559
* Implements the "Executing operations" section of the spec.
545560
*/
@@ -1102,17 +1117,19 @@ async function completeAsyncIteratorValue(
11021117
typeof stream.initialCount === 'number' &&
11031118
index >= stream.initialCount
11041119
) {
1105-
// eslint-disable-next-line @typescript-eslint/no-floating-promises
1106-
executeStreamAsyncIterator(
1107-
index,
1108-
asyncIterator,
1109-
exeContext,
1110-
fieldGroup,
1111-
info,
1112-
itemType,
1113-
path,
1114-
asyncPayloadRecord,
1115-
);
1120+
if (shouldStream(fieldGroup, exeContext)) {
1121+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
1122+
executeStreamAsyncIterator(
1123+
index,
1124+
asyncIterator,
1125+
exeContext,
1126+
fieldGroup,
1127+
info,
1128+
itemType,
1129+
path,
1130+
asyncPayloadRecord,
1131+
);
1132+
}
11161133
break;
11171134
}
11181135

@@ -1208,16 +1225,18 @@ function completeListValue(
12081225
typeof stream.initialCount === 'number' &&
12091226
index >= stream.initialCount
12101227
) {
1211-
executeStreamIterator(
1212-
index,
1213-
iterator,
1214-
exeContext,
1215-
fieldGroup,
1216-
info,
1217-
itemType,
1218-
path,
1219-
asyncPayloadRecord,
1220-
);
1228+
if (shouldStream(fieldGroup, exeContext)) {
1229+
executeStreamIterator(
1230+
index,
1231+
iterator,
1232+
exeContext,
1233+
fieldGroup,
1234+
info,
1235+
itemType,
1236+
path,
1237+
asyncPayloadRecord,
1238+
);
1239+
}
12211240
break;
12221241
}
12231242

0 commit comments

Comments
 (0)