Skip to content

Commit 5419f04

Browse files
committed
Return underlying AsyncIterators when execute result is returned (#2843)
# Conflicts: # src/execution/execute.ts
1 parent f103572 commit 5419f04

File tree

2 files changed

+232
-7
lines changed

2 files changed

+232
-7
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { describe, it } from 'mocha';
22

33
import { expectJSON } from '../../__testUtils__/expectJSON';
44

5+
import { invariant } from '../../jsutils/invariant';
56
import { isAsyncIterable } from '../../jsutils/isAsyncIterable';
67

78
import type { DocumentNode } from '../../language/ast';
@@ -112,6 +113,37 @@ const query = new GraphQLObjectType({
112113
yield await Promise.resolve({});
113114
},
114115
},
116+
asyncIterableListDelayed: {
117+
type: new GraphQLList(friendType),
118+
async *resolve() {
119+
for (const friend of friends) {
120+
// pause an additional ms before yielding to allow time
121+
// for tests to return or throw before next value is processed.
122+
// eslint-disable-next-line no-await-in-loop
123+
await new Promise((r) => setTimeout(r, 1));
124+
yield friend; /* c8 ignore start */
125+
// Not reachable, early return
126+
}
127+
} /* c8 ignore stop */,
128+
},
129+
asyncIterableListNoReturn: {
130+
type: new GraphQLList(friendType),
131+
resolve() {
132+
let i = 0;
133+
return {
134+
[Symbol.asyncIterator]: () => ({
135+
async next() {
136+
const friend = friends[i++];
137+
if (friend) {
138+
await new Promise((r) => setTimeout(r, 1));
139+
return { value: friend, done: false };
140+
}
141+
return { value: undefined, done: true };
142+
},
143+
}),
144+
};
145+
},
146+
},
115147
asyncIterableListDelayedClose: {
116148
type: new GraphQLList(friendType),
117149
async *resolve() {
@@ -1005,4 +1037,175 @@ describe('Execute: stream directive', () => {
10051037
},
10061038
]);
10071039
});
1040+
it('Returns underlying async iterables when dispatcher is returned', async () => {
1041+
const document = parse(`
1042+
query {
1043+
asyncIterableListDelayed @stream(initialCount: 1) {
1044+
name
1045+
id
1046+
}
1047+
}
1048+
`);
1049+
const schema = new GraphQLSchema({ query });
1050+
1051+
const executeResult = await execute({ schema, document, rootValue: {} });
1052+
invariant(isAsyncIterable(executeResult));
1053+
const iterator = executeResult[Symbol.asyncIterator]();
1054+
1055+
const result1 = await iterator.next();
1056+
expectJSON(result1).toDeepEqual({
1057+
done: false,
1058+
value: {
1059+
data: {
1060+
asyncIterableListDelayed: [
1061+
{
1062+
id: '1',
1063+
name: 'Luke',
1064+
},
1065+
],
1066+
},
1067+
hasNext: true,
1068+
},
1069+
});
1070+
1071+
const returnPromise = iterator.return();
1072+
1073+
// this result had started processing before return was called
1074+
const result2 = await iterator.next();
1075+
expectJSON(result2).toDeepEqual({
1076+
done: false,
1077+
value: {
1078+
data: {
1079+
id: '2',
1080+
name: 'Han',
1081+
},
1082+
hasNext: true,
1083+
path: ['asyncIterableListDelayed', 1],
1084+
},
1085+
});
1086+
1087+
// third result is not returned because async iterator has returned
1088+
const result3 = await iterator.next();
1089+
expectJSON(result3).toDeepEqual({
1090+
done: true,
1091+
value: undefined,
1092+
});
1093+
await returnPromise;
1094+
});
1095+
it('Can return async iterable when underlying iterable does not have a return method', async () => {
1096+
const document = parse(`
1097+
query {
1098+
asyncIterableListNoReturn @stream(initialCount: 1) {
1099+
name
1100+
id
1101+
}
1102+
}
1103+
`);
1104+
const schema = new GraphQLSchema({ query });
1105+
1106+
const executeResult = await execute({ schema, document, rootValue: {} });
1107+
invariant(isAsyncIterable(executeResult));
1108+
const iterator = executeResult[Symbol.asyncIterator]();
1109+
1110+
const result1 = await iterator.next();
1111+
expectJSON(result1).toDeepEqual({
1112+
done: false,
1113+
value: {
1114+
data: {
1115+
asyncIterableListNoReturn: [
1116+
{
1117+
id: '1',
1118+
name: 'Luke',
1119+
},
1120+
],
1121+
},
1122+
hasNext: true,
1123+
},
1124+
});
1125+
1126+
const returnPromise = iterator.return();
1127+
1128+
// this result had started processing before return was called
1129+
const result2 = await iterator.next();
1130+
expectJSON(result2).toDeepEqual({
1131+
done: false,
1132+
value: {
1133+
data: {
1134+
id: '2',
1135+
name: 'Han',
1136+
},
1137+
hasNext: true,
1138+
path: ['asyncIterableListNoReturn', 1],
1139+
},
1140+
});
1141+
1142+
// third result is not returned because async iterator has returned
1143+
const result3 = await iterator.next();
1144+
expectJSON(result3).toDeepEqual({
1145+
done: true,
1146+
value: undefined,
1147+
});
1148+
await returnPromise;
1149+
});
1150+
it('Returns underlying async iterables when dispatcher is thrown', async () => {
1151+
const document = parse(`
1152+
query {
1153+
asyncIterableListDelayed @stream(initialCount: 1) {
1154+
name
1155+
id
1156+
}
1157+
}
1158+
`);
1159+
const schema = new GraphQLSchema({ query });
1160+
1161+
const executeResult = await execute({ schema, document, rootValue: {} });
1162+
invariant(isAsyncIterable(executeResult));
1163+
const iterator = executeResult[Symbol.asyncIterator]();
1164+
1165+
const result1 = await iterator.next();
1166+
expectJSON(result1).toDeepEqual({
1167+
done: false,
1168+
value: {
1169+
data: {
1170+
asyncIterableListDelayed: [
1171+
{
1172+
id: '1',
1173+
name: 'Luke',
1174+
},
1175+
],
1176+
},
1177+
hasNext: true,
1178+
},
1179+
});
1180+
1181+
const throwPromise = iterator.throw(new Error('bad'));
1182+
1183+
// this result had started processing before return was called
1184+
const result2 = await iterator.next();
1185+
expectJSON(result2).toDeepEqual({
1186+
done: false,
1187+
value: {
1188+
data: {
1189+
id: '2',
1190+
name: 'Han',
1191+
},
1192+
hasNext: true,
1193+
path: ['asyncIterableListDelayed', 1],
1194+
},
1195+
});
1196+
1197+
// third result is not returned because async iterator has returned
1198+
const result3 = await iterator.next();
1199+
expectJSON(result3).toDeepEqual({
1200+
done: true,
1201+
value: undefined,
1202+
});
1203+
try {
1204+
await throwPromise; /* c8 ignore start */
1205+
// Not reachable, always throws
1206+
/* c8 ignore stop */
1207+
} catch (e) {
1208+
// ignore error
1209+
}
1210+
});
10081211
});

src/execution/execute.ts

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1495,6 +1495,7 @@ function executeStreamIterator(
14951495
const asyncPayloadRecord = new AsyncPayloadRecord({
14961496
label,
14971497
path: fieldPath,
1498+
iterator,
14981499
});
14991500
const dataPromise: Promise<unknown> = iterator
15001501
.next()
@@ -1567,6 +1568,7 @@ function yieldSubsequentPayloads(
15671568
initialResult: ExecutionResult,
15681569
): AsyncGenerator<AsyncExecutionResult, void, void> {
15691570
let _hasReturnedInitialResult = false;
1571+
let isDone = false;
15701572

15711573
function race(): Promise<IteratorResult<AsyncExecutionResult>> {
15721574
if (exeContext.subsequentPayloads.length === 0) {
@@ -1635,17 +1637,31 @@ function yieldSubsequentPayloads(
16351637
},
16361638
done: false,
16371639
});
1638-
} else if (exeContext.subsequentPayloads.length === 0) {
1640+
} else if (exeContext.subsequentPayloads.length === 0 || isDone) {
16391641
return Promise.resolve({ value: undefined, done: true });
16401642
}
16411643
return race();
16421644
},
1643-
// TODO: implement return & throw
1644-
return: /* istanbul ignore next: will be covered in follow up */ () =>
1645-
Promise.resolve({ value: undefined, done: true }),
1646-
throw: /* istanbul ignore next: will be covered in follow up */ (
1645+
async return(): Promise<IteratorResult<AsyncExecutionResult, void>> {
1646+
await Promise.all(
1647+
exeContext.subsequentPayloads.map((asyncPayloadRecord) =>
1648+
asyncPayloadRecord.iterator?.return?.(),
1649+
),
1650+
);
1651+
isDone = true;
1652+
return { value: undefined, done: true };
1653+
},
1654+
async throw(
16471655
error?: unknown,
1648-
) => Promise.reject(error),
1656+
): Promise<IteratorResult<AsyncExecutionResult, void>> {
1657+
await Promise.all(
1658+
exeContext.subsequentPayloads.map((asyncPayloadRecord) =>
1659+
asyncPayloadRecord.iterator?.return?.(),
1660+
),
1661+
);
1662+
isDone = true;
1663+
return Promise.reject(error);
1664+
},
16491665
};
16501666
}
16511667

@@ -1654,10 +1670,16 @@ class AsyncPayloadRecord {
16541670
label?: string;
16551671
path?: Path;
16561672
dataPromise?: Promise<unknown | null | undefined>;
1673+
iterator?: AsyncIterator<unknown>;
16571674
isCompletedIterator?: boolean;
1658-
constructor(opts: { label?: string; path?: Path }) {
1675+
constructor(opts: {
1676+
label?: string;
1677+
path?: Path;
1678+
iterator?: AsyncIterator<unknown>;
1679+
}) {
16591680
this.label = opts.label;
16601681
this.path = opts.path;
1682+
this.iterator = opts.iterator;
16611683
this.errors = [];
16621684
}
16631685

0 commit comments

Comments
 (0)