Skip to content

Commit 8314636

Browse files
committed
Implement support for @stream directive
1 parent 63ca2ca commit 8314636

File tree

10 files changed

+1138
-3
lines changed

10 files changed

+1138
-3
lines changed

src/execution/__tests__/stream-test.js

Lines changed: 629 additions & 0 deletions
Large diffs are not rendered by default.

src/execution/execute.js

Lines changed: 215 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ import {
5353
GraphQLIncludeDirective,
5454
GraphQLSkipDirective,
5555
GraphQLDeferDirective,
56+
GraphQLStreamDirective,
5657
} from '../type/directives';
5758
import {
5859
isNamedType,
@@ -748,6 +749,42 @@ function getDeferValues(
748749
};
749750
}
750751

752+
/**
753+
* Returns an object containing the @stream arguments if a field should be
754+
* streamed based on the experimental flag, stream directive present and
755+
* not disabled by the "if" argument.
756+
*/
757+
function getStreamValues(
758+
exeContext: ExecutionContext,
759+
fieldNodes: $ReadOnlyArray<FieldNode>,
760+
): void | {|
761+
initialCount?: number,
762+
label?: string,
763+
|} {
764+
// validation only allows equivalent streams on multiple fields, so it is
765+
// safe to only check the first fieldNode for the stream directive
766+
const stream = getDirectiveValues(
767+
GraphQLStreamDirective,
768+
fieldNodes[0],
769+
exeContext.variableValues,
770+
);
771+
772+
if (!stream) {
773+
return;
774+
}
775+
776+
if (stream.if === false) {
777+
return;
778+
}
779+
780+
return {
781+
initialCount:
782+
// istanbul ignore next (initialCount is required number argument)
783+
typeof stream.initialCount === 'number' ? stream.initialCount : undefined,
784+
label: typeof stream.label === 'string' ? stream.label : undefined,
785+
};
786+
}
787+
751788
/**
752789
* Determines if a fragment is applicable to the given type.
753790
*/
@@ -1040,6 +1077,7 @@ function completeAsyncIteratorValue(
10401077
errors: Array<GraphQLError>,
10411078
): Promise<$ReadOnlyArray<mixed>> {
10421079
let containsPromise = false;
1080+
const stream = getStreamValues(exeContext, fieldNodes);
10431081
return new Promise((resolve) => {
10441082
function next(index, completedResults) {
10451083
const fieldPath = addPath(path, index, undefined);
@@ -1076,7 +1114,26 @@ function completeAsyncIteratorValue(
10761114
return;
10771115
}
10781116

1079-
next(index + 1, completedResults);
1117+
const newIndex = index + 1;
1118+
if (
1119+
stream &&
1120+
typeof stream.initialCount === 'number' &&
1121+
newIndex >= stream.initialCount
1122+
) {
1123+
exeContext.dispatcher.addAsyncIteratorValue(
1124+
stream.label,
1125+
newIndex,
1126+
path,
1127+
iterator,
1128+
exeContext,
1129+
fieldNodes,
1130+
info,
1131+
itemType,
1132+
);
1133+
resolve(completedResults);
1134+
return;
1135+
}
1136+
next(newIndex, completedResults);
10801137
},
10811138
(rawError) => {
10821139
completedResults.push(null);
@@ -1131,6 +1188,8 @@ function completeListValue(
11311188
);
11321189
}
11331190

1191+
const stream = getStreamValues(exeContext, fieldNodes);
1192+
11341193
// This is specified as a simple map, however we're optimizing the path
11351194
// where the list contains no Promises by avoiding creating another Promise.
11361195
let containsPromise = false;
@@ -1140,6 +1199,23 @@ function completeListValue(
11401199
const itemPath = addPath(path, index, undefined);
11411200
try {
11421201
let completedItem;
1202+
1203+
if (
1204+
stream &&
1205+
typeof stream.initialCount === 'number' &&
1206+
index >= stream.initialCount
1207+
) {
1208+
exeContext.dispatcher.addValue(
1209+
stream.label,
1210+
itemPath,
1211+
item,
1212+
exeContext,
1213+
fieldNodes,
1214+
info,
1215+
itemType,
1216+
);
1217+
return;
1218+
}
11431219
if (isPromise(item)) {
11441220
completedItem = item.then((resolved) =>
11451221
completeValue(
@@ -1182,7 +1258,7 @@ function completeListValue(
11821258
const error = locatedError(rawError, fieldNodes, pathToArray(itemPath));
11831259
return handleFieldError(error, itemType, errors);
11841260
}
1185-
});
1261+
}).filter((val) => val !== undefined);
11861262

11871263
return containsPromise ? Promise.all(completedResults) : completedResults;
11881264
}
@@ -1595,6 +1671,129 @@ export class Dispatcher {
15951671
);
15961672
}
15971673

1674+
addValue(
1675+
label?: string,
1676+
path: Path,
1677+
promiseOrData: PromiseOrValue<ObjMap<mixed> | mixed>,
1678+
exeContext: ExecutionContext,
1679+
fieldNodes: $ReadOnlyArray<FieldNode>,
1680+
info: GraphQLResolveInfo,
1681+
itemType: GraphQLOutputType,
1682+
): void {
1683+
const errors = [];
1684+
this._subsequentPayloads.push(
1685+
Promise.resolve(promiseOrData)
1686+
.then((resolved) =>
1687+
completeValue(
1688+
exeContext,
1689+
itemType,
1690+
fieldNodes,
1691+
info,
1692+
path,
1693+
resolved,
1694+
errors,
1695+
),
1696+
)
1697+
// Note: we don't rely on a `catch` method, but we do expect "thenable"
1698+
// to take a second callback for the error case.
1699+
.then(undefined, (rawError) => {
1700+
const error = locatedError(rawError, fieldNodes, pathToArray(path));
1701+
return handleFieldError(error, itemType, errors);
1702+
})
1703+
.then((data) => ({
1704+
value: createPatchResult(data, label, path, errors),
1705+
done: false,
1706+
})),
1707+
);
1708+
}
1709+
1710+
addAsyncIteratorValue(
1711+
label?: string,
1712+
initialIndex: number,
1713+
path?: Path,
1714+
iterator: AsyncIterator<mixed>,
1715+
exeContext: ExecutionContext,
1716+
fieldNodes: $ReadOnlyArray<FieldNode>,
1717+
info: GraphQLResolveInfo,
1718+
itemType: GraphQLOutputType,
1719+
): void {
1720+
const subsequentPayloads = this._subsequentPayloads;
1721+
function next(index) {
1722+
const fieldPath = addPath(path, index);
1723+
const patchErrors = [];
1724+
subsequentPayloads.push(
1725+
iterator.next().then(
1726+
({ value: data, done }) => {
1727+
if (done) {
1728+
return { value: undefined, done: true };
1729+
}
1730+
1731+
// eslint-disable-next-line node/callback-return
1732+
next(index + 1);
1733+
1734+
try {
1735+
const completedItem = completeValue(
1736+
exeContext,
1737+
itemType,
1738+
fieldNodes,
1739+
info,
1740+
fieldPath,
1741+
data,
1742+
patchErrors,
1743+
);
1744+
1745+
if (isPromise(completedItem)) {
1746+
return completedItem.then((resolveItem) => ({
1747+
value: createPatchResult(
1748+
resolveItem,
1749+
label,
1750+
fieldPath,
1751+
patchErrors,
1752+
),
1753+
done: false,
1754+
}));
1755+
}
1756+
1757+
return {
1758+
value: createPatchResult(
1759+
completedItem,
1760+
label,
1761+
fieldPath,
1762+
patchErrors,
1763+
),
1764+
done: false,
1765+
};
1766+
} catch (rawError) {
1767+
const error = locatedError(
1768+
rawError,
1769+
fieldNodes,
1770+
pathToArray(fieldPath),
1771+
);
1772+
handleFieldError(error, itemType, patchErrors);
1773+
return {
1774+
value: createPatchResult(null, label, fieldPath, patchErrors),
1775+
done: false,
1776+
};
1777+
}
1778+
},
1779+
(rawError) => {
1780+
const error = locatedError(
1781+
rawError,
1782+
fieldNodes,
1783+
pathToArray(fieldPath),
1784+
);
1785+
handleFieldError(error, itemType, patchErrors);
1786+
return {
1787+
value: createPatchResult(null, label, fieldPath, patchErrors),
1788+
done: false,
1789+
};
1790+
},
1791+
),
1792+
);
1793+
}
1794+
next(initialIndex);
1795+
}
1796+
15981797
_race(): Promise<IteratorResult<ExecutionPatchResult, void>> {
15991798
return new Promise((resolve) => {
16001799
this._subsequentPayloads.forEach((promise) => {
@@ -1611,7 +1810,20 @@ export class Dispatcher {
16111810
);
16121811
return promise;
16131812
})
1614-
.then(({ value }) => {
1813+
.then(({ value, done }) => {
1814+
if (done && this._subsequentPayloads.length === 0) {
1815+
// async iterable resolver just finished and no more pending payloads
1816+
return {
1817+
value: {
1818+
hasNext: false,
1819+
},
1820+
done: false,
1821+
};
1822+
} else if (done) {
1823+
// async iterable resolver just finished but there are pending payloads
1824+
// return the next one
1825+
return this._race();
1826+
}
16151827
const returnValue: ExecutionPatchResult = {
16161828
...value,
16171829
hasNext: this._subsequentPayloads.length > 0,

0 commit comments

Comments
 (0)