Skip to content

Commit 2ebfec2

Browse files
committed
Implement support for @stream directive
1 parent cfe927d commit 2ebfec2

File tree

10 files changed

+1180
-14
lines changed

10 files changed

+1180
-14
lines changed

src/execution/__tests__/stream-test.js

Lines changed: 652 additions & 0 deletions
Large diffs are not rendered by default.

src/execution/execute.js

Lines changed: 233 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import {
5050
GraphQLIncludeDirective,
5151
GraphQLSkipDirective,
5252
GraphQLDeferDirective,
53+
GraphQLStreamDirective,
5354
} from '../type/directives';
5455
import {
5556
isObjectType,
@@ -701,6 +702,42 @@ function getDeferValues(
701702
};
702703
}
703704

705+
/**
706+
* Returns an object containing the @stream arguments if a field should be
707+
* streamed based on the experimental flag, stream directive present and
708+
* not disabled by the "if" argument.
709+
*/
710+
function getStreamValues(
711+
exeContext: ExecutionContext,
712+
fieldNodes: $ReadOnlyArray<FieldNode>,
713+
): void | {|
714+
initialCount?: number,
715+
label?: string,
716+
|} {
717+
// validation only allows equivalent streams on multiple fields, so it is
718+
// safe to only check the first fieldNode for the stream directive
719+
const stream = getDirectiveValues(
720+
GraphQLStreamDirective,
721+
fieldNodes[0],
722+
exeContext.variableValues,
723+
);
724+
725+
if (!stream) {
726+
return;
727+
}
728+
729+
if (stream.if === false) {
730+
return;
731+
}
732+
733+
return {
734+
initialCount:
735+
// istanbul ignore next (initialCount is required number argument)
736+
typeof stream.initialCount === 'number' ? stream.initialCount : undefined,
737+
label: typeof stream.label === 'string' ? stream.label : undefined,
738+
};
739+
}
740+
704741
/**
705742
* Determines if a fragment is applicable to the given type.
706743
*/
@@ -993,6 +1030,7 @@ function completeAsyncIteratorValue(
9931030
errors: Array<GraphQLError>,
9941031
): Promise<$ReadOnlyArray<mixed>> {
9951032
let containsPromise = false;
1033+
const stream = getStreamValues(exeContext, fieldNodes);
9961034
return new Promise((resolve) => {
9971035
function next(index, completedResults) {
9981036
const fieldPath = addPath(path, index, undefined);
@@ -1029,7 +1067,26 @@ function completeAsyncIteratorValue(
10291067
return;
10301068
}
10311069

1032-
next(index + 1, completedResults);
1070+
const newIndex = index + 1;
1071+
if (
1072+
stream &&
1073+
typeof stream.initialCount === 'number' &&
1074+
newIndex >= stream.initialCount
1075+
) {
1076+
exeContext.dispatcher.addAsyncIteratorValue(
1077+
stream.label,
1078+
newIndex,
1079+
path,
1080+
iterator,
1081+
exeContext,
1082+
fieldNodes,
1083+
info,
1084+
itemType,
1085+
);
1086+
resolve(completedResults);
1087+
return;
1088+
}
1089+
next(newIndex, completedResults);
10331090
},
10341091
(rawError) => {
10351092
completedResults.push(null);
@@ -1084,15 +1141,37 @@ function completeListValue(
10841141
);
10851142
}
10861143

1144+
const stream = getStreamValues(exeContext, fieldNodes);
1145+
10871146
// This is specified as a simple map, however we're optimizing the path
10881147
// where the list contains no Promises by avoiding creating another Promise.
10891148
let containsPromise = false;
1090-
const completedResults = Array.from(result, (item, index) => {
1149+
const completedResults = [];
1150+
let index = 0;
1151+
for (const item of result) {
10911152
// No need to modify the info object containing the path,
10921153
// since from here on it is not ever accessed by resolver functions.
10931154
const itemPath = addPath(path, index, undefined);
10941155
try {
10951156
let completedItem;
1157+
1158+
if (
1159+
stream &&
1160+
typeof stream.initialCount === 'number' &&
1161+
index >= stream.initialCount
1162+
) {
1163+
exeContext.dispatcher.addValue(
1164+
stream.label,
1165+
itemPath,
1166+
item,
1167+
exeContext,
1168+
fieldNodes,
1169+
info,
1170+
itemType,
1171+
);
1172+
index++;
1173+
continue;
1174+
}
10961175
if (isPromise(item)) {
10971176
completedItem = item.then((resolved) =>
10981177
completeValue(
@@ -1121,21 +1200,25 @@ function completeListValue(
11211200
containsPromise = true;
11221201
// Note: we don't rely on a `catch` method, but we do expect "thenable"
11231202
// to take a second callback for the error case.
1124-
return completedItem.then(undefined, (rawError) => {
1125-
const error = locatedError(
1126-
rawError,
1127-
fieldNodes,
1128-
pathToArray(itemPath),
1129-
);
1130-
return handleFieldError(error, itemType, errors);
1131-
});
1203+
completedResults.push(
1204+
completedItem.then(undefined, (rawError) => {
1205+
const error = locatedError(
1206+
rawError,
1207+
fieldNodes,
1208+
pathToArray(itemPath),
1209+
);
1210+
return handleFieldError(error, itemType, errors);
1211+
}),
1212+
);
1213+
} else {
1214+
completedResults.push(completedItem);
11321215
}
1133-
return completedItem;
11341216
} catch (rawError) {
11351217
const error = locatedError(rawError, fieldNodes, pathToArray(itemPath));
1136-
return handleFieldError(error, itemType, errors);
1218+
completedResults.push(handleFieldError(error, itemType, errors));
11371219
}
1138-
});
1220+
index++;
1221+
}
11391222

11401223
return containsPromise ? Promise.all(completedResults) : completedResults;
11411224
}
@@ -1551,6 +1634,129 @@ export class Dispatcher {
15511634
);
15521635
}
15531636

1637+
addValue(
1638+
label?: string,
1639+
path: Path,
1640+
promiseOrData: PromiseOrValue<ObjMap<mixed> | mixed>,
1641+
exeContext: ExecutionContext,
1642+
fieldNodes: $ReadOnlyArray<FieldNode>,
1643+
info: GraphQLResolveInfo,
1644+
itemType: GraphQLOutputType,
1645+
): void {
1646+
const errors = [];
1647+
this._subsequentPayloads.push(
1648+
Promise.resolve(promiseOrData)
1649+
.then((resolved) =>
1650+
completeValue(
1651+
exeContext,
1652+
itemType,
1653+
fieldNodes,
1654+
info,
1655+
path,
1656+
resolved,
1657+
errors,
1658+
),
1659+
)
1660+
// Note: we don't rely on a `catch` method, but we do expect "thenable"
1661+
// to take a second callback for the error case.
1662+
.then(undefined, (rawError) => {
1663+
const error = locatedError(rawError, fieldNodes, pathToArray(path));
1664+
return handleFieldError(error, itemType, errors);
1665+
})
1666+
.then((data) => ({
1667+
value: createPatchResult(data, label, path, errors),
1668+
done: false,
1669+
})),
1670+
);
1671+
}
1672+
1673+
addAsyncIteratorValue(
1674+
label?: string,
1675+
initialIndex: number,
1676+
path?: Path,
1677+
iterator: AsyncIterator<mixed>,
1678+
exeContext: ExecutionContext,
1679+
fieldNodes: $ReadOnlyArray<FieldNode>,
1680+
info: GraphQLResolveInfo,
1681+
itemType: GraphQLOutputType,
1682+
): void {
1683+
const subsequentPayloads = this._subsequentPayloads;
1684+
function next(index) {
1685+
const fieldPath = addPath(path, index);
1686+
const patchErrors = [];
1687+
subsequentPayloads.push(
1688+
iterator.next().then(
1689+
({ value: data, done }) => {
1690+
if (done) {
1691+
return { value: undefined, done: true };
1692+
}
1693+
1694+
// eslint-disable-next-line node/callback-return
1695+
next(index + 1);
1696+
1697+
try {
1698+
const completedItem = completeValue(
1699+
exeContext,
1700+
itemType,
1701+
fieldNodes,
1702+
info,
1703+
fieldPath,
1704+
data,
1705+
patchErrors,
1706+
);
1707+
1708+
if (isPromise(completedItem)) {
1709+
return completedItem.then((resolveItem) => ({
1710+
value: createPatchResult(
1711+
resolveItem,
1712+
label,
1713+
fieldPath,
1714+
patchErrors,
1715+
),
1716+
done: false,
1717+
}));
1718+
}
1719+
1720+
return {
1721+
value: createPatchResult(
1722+
completedItem,
1723+
label,
1724+
fieldPath,
1725+
patchErrors,
1726+
),
1727+
done: false,
1728+
};
1729+
} catch (rawError) {
1730+
const error = locatedError(
1731+
rawError,
1732+
fieldNodes,
1733+
pathToArray(fieldPath),
1734+
);
1735+
handleFieldError(error, itemType, patchErrors);
1736+
return {
1737+
value: createPatchResult(null, label, fieldPath, patchErrors),
1738+
done: false,
1739+
};
1740+
}
1741+
},
1742+
(rawError) => {
1743+
const error = locatedError(
1744+
rawError,
1745+
fieldNodes,
1746+
pathToArray(fieldPath),
1747+
);
1748+
handleFieldError(error, itemType, patchErrors);
1749+
return {
1750+
value: createPatchResult(null, label, fieldPath, patchErrors),
1751+
done: false,
1752+
};
1753+
},
1754+
),
1755+
);
1756+
}
1757+
next(initialIndex);
1758+
}
1759+
15541760
_race(): Promise<IteratorResult<ExecutionPatchResult, void>> {
15551761
return new Promise((resolve) => {
15561762
this._subsequentPayloads.forEach((promise) => {
@@ -1567,7 +1773,20 @@ export class Dispatcher {
15671773
);
15681774
return promise;
15691775
})
1570-
.then(({ value }) => {
1776+
.then(({ value, done }) => {
1777+
if (done && this._subsequentPayloads.length === 0) {
1778+
// async iterable resolver just finished and no more pending payloads
1779+
return {
1780+
value: {
1781+
hasNext: false,
1782+
},
1783+
done: false,
1784+
};
1785+
} else if (done) {
1786+
// async iterable resolver just finished but there are pending payloads
1787+
// return the next one
1788+
return this._race();
1789+
}
15711790
const returnValue: ExecutionPatchResult = {
15721791
...value,
15731792
hasNext: this._subsequentPayloads.length > 0,

0 commit comments

Comments
 (0)