Skip to content

incremental: add highWaterMark option to apply backpressure with async streams #4090

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 49 additions & 11 deletions src/execution/IncrementalPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ export function buildIncrementalResponse(
}

interface IncrementalPublisherContext {
streamHighWaterMark: number;
cancellableStreams: Set<CancellableStreamRecord> | undefined;
}

Expand All @@ -201,6 +202,7 @@ class IncrementalPublisher {
private _completedResultQueue: Array<IncrementalDataRecordResult>;
private _newPending: Set<SubsequentResultRecord>;
private _incremental: Array<IncrementalResult>;
private _asyncStreamCounts: Map<AsyncStreamRecord, number>;
private _completed: Array<CompletedResult>;
// these are assigned within the Promise executor called synchronously within the constructor
private _signalled!: Promise<unknown>;
Expand All @@ -213,6 +215,7 @@ class IncrementalPublisher {
this._completedResultQueue = [];
this._newPending = new Set();
this._incremental = [];
this._asyncStreamCounts = new Map();
this._completed = [];
this._reset();
}
Expand Down Expand Up @@ -427,7 +430,18 @@ class IncrementalPublisher {
subsequentIncrementalExecutionResult.completed = this._completed;
}

for (const [streamRecord, count] of this._asyncStreamCounts) {
streamRecord.waterMark -= count;
if (
streamRecord.resume !== undefined &&
streamRecord.waterMark < this._context.streamHighWaterMark
) {
streamRecord.resume();
}
}

this._incremental = [];
this._asyncStreamCounts.clear();
this._completed = [];

return { value: subsequentIncrementalExecutionResult, done: false };
Expand Down Expand Up @@ -593,20 +607,26 @@ class IncrementalPublisher {
errors: streamItemsResult.errors,
});
this._pending.delete(streamRecord);
if (isCancellableStreamRecord(streamRecord)) {
invariant(this._context.cancellableStreams !== undefined);
this._context.cancellableStreams.delete(streamRecord);
streamRecord.earlyReturn().catch(() => {
/* c8 ignore next 1 */
// ignore error
});
if (isAsyncStreamRecord(streamRecord)) {
this._asyncStreamCounts.delete(streamRecord);
if (isCancellableStreamRecord(streamRecord)) {
invariant(this._context.cancellableStreams !== undefined);
this._context.cancellableStreams.delete(streamRecord);
streamRecord.earlyReturn().catch(() => {
/* c8 ignore next 1 */
// ignore error
});
}
}
} else if (streamItemsResult.result === undefined) {
this._completed.push({ id });
this._pending.delete(streamRecord);
if (isCancellableStreamRecord(streamRecord)) {
invariant(this._context.cancellableStreams !== undefined);
this._context.cancellableStreams.delete(streamRecord);
if (isAsyncStreamRecord(streamRecord)) {
this._asyncStreamCounts.delete(streamRecord);
if (isCancellableStreamRecord(streamRecord)) {
invariant(this._context.cancellableStreams !== undefined);
this._context.cancellableStreams.delete(streamRecord);
}
}
} else {
const incrementalEntry: IncrementalStreamResult = {
Expand All @@ -615,6 +635,13 @@ class IncrementalPublisher {
};

this._incremental.push(incrementalEntry);
if (isAsyncStreamRecord(streamRecord)) {
const count = this._asyncStreamCounts.get(streamRecord);
this._asyncStreamCounts.set(
streamRecord,
count === undefined ? 1 : count + 1,
);
}

if (streamItemsResult.incrementalDataRecords !== undefined) {
this._addIncrementalDataRecords(
Expand Down Expand Up @@ -739,7 +766,18 @@ export class DeferredFragmentRecord implements SubsequentResultRecord {
}
}

export interface CancellableStreamRecord extends SubsequentResultRecord {
export interface AsyncStreamRecord extends SubsequentResultRecord {
waterMark: number;
resume: (() => void) | undefined;
}

function isAsyncStreamRecord(
subsequentResultRecord: SubsequentResultRecord,
): subsequentResultRecord is AsyncStreamRecord {
return 'waterMark' in subsequentResultRecord;
}

export interface CancellableStreamRecord extends AsyncStreamRecord {
earlyReturn: () => Promise<unknown>;
}

Expand Down
100 changes: 100 additions & 0 deletions src/execution/__tests__/stream-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,106 @@ describe('Execute: stream directive', () => {
},
});
});
it('Can stream a field that returns an async iterable with backpressure', async () => {
const document = parse(`
query {
friendList @stream {
name
id
}
}
`);
let count = 0;
const executeResult = await experimentalExecuteIncrementally({
schema,
document,
rootValue: {
async *friendList() {
for (const friend of friends) {
count++;
// eslint-disable-next-line no-await-in-loop
yield await Promise.resolve(friend);
}
},
},
streamHighWaterMark: 2,
});
assert('initialResult' in executeResult);
const iterator = executeResult.subsequentResults[Symbol.asyncIterator]();

const result1 = executeResult.initialResult;
expectJSON(result1).toDeepEqual({
data: {
friendList: [],
},
pending: [{ id: '0', path: ['friendList'] }],
hasNext: true,
});

expect(count).to.equal(2);

await resolveOnNextTick();
await resolveOnNextTick();
await resolveOnNextTick();
await resolveOnNextTick();
await resolveOnNextTick();

const result2 = await iterator.next();
expectJSON(result2).toDeepEqual({
done: false,
value: {
incremental: [
{
items: [{ name: 'Luke', id: '1' }],
id: '0',
},
],
hasNext: true,
},
});

expect(count).to.equal(3);

const result3 = await iterator.next();
expectJSON(result3).toDeepEqual({
done: false,
value: {
incremental: [
{
items: [{ name: 'Han', id: '2' }],
id: '0',
},
],
hasNext: true,
},
});

const result4 = await iterator.next();
expectJSON(result4).toDeepEqual({
done: false,
value: {
incremental: [
{
items: [{ name: 'Leia', id: '3' }],
id: '0',
},
],
hasNext: true,
},
});

const result5 = await iterator.next();
expectJSON(result5).toDeepEqual({
done: false,
value: {
completed: [{ id: '0' }],
hasNext: false,
},
});

const result6 = await iterator.next();
expectJSON(result6).toDeepEqual({ done: true, value: undefined });
});
it('Can handle concurrent calls to .next() without waiting', async () => {
const document = parse(`
query {
Expand Down
32 changes: 28 additions & 4 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { addPath, pathToArray } from '../jsutils/Path.js';
import { promiseForObject } from '../jsutils/promiseForObject.js';
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
import { promiseReduce } from '../jsutils/promiseReduce.js';
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';

import { GraphQLError } from '../error/GraphQLError.js';
import { locatedError } from '../error/locatedError.js';
Expand Down Expand Up @@ -59,6 +60,7 @@ import {
collectSubfields as _collectSubfields,
} from './collectFields.js';
import type {
AsyncStreamRecord,
CancellableStreamRecord,
DeferredGroupedFieldSetRecord,
DeferredGroupedFieldSetResult,
Expand Down Expand Up @@ -143,6 +145,7 @@ export interface ExecutionContext {
typeResolver: GraphQLTypeResolver<any, any>;
subscribeFieldResolver: GraphQLFieldResolver<any, any>;
errors: Array<GraphQLError> | undefined;
streamHighWaterMark: number;
cancellableStreams: Set<CancellableStreamRecord> | undefined;
}

Expand All @@ -161,6 +164,7 @@ export interface ExecutionArgs {
fieldResolver?: Maybe<GraphQLFieldResolver<any, any>>;
typeResolver?: Maybe<GraphQLTypeResolver<any, any>>;
subscribeFieldResolver?: Maybe<GraphQLFieldResolver<any, any>>;
streamHighWaterMark?: Maybe<number>;
}

export interface StreamUsage {
Expand Down Expand Up @@ -439,6 +443,7 @@ export function buildExecutionContext(
fieldResolver,
typeResolver,
subscribeFieldResolver,
streamHighWaterMark,
} = args;

// If the schema used for execution is invalid, throw an error.
Expand Down Expand Up @@ -504,6 +509,7 @@ export function buildExecutionContext(
subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver,
errors: undefined,
cancellableStreams: undefined,
streamHighWaterMark: streamHighWaterMark ?? 100,
};
}

Expand Down Expand Up @@ -1096,16 +1102,20 @@ async function completeAsyncIteratorValue(
while (true) {
if (streamUsage && index >= streamUsage.initialCount) {
const returnFn = asyncIterator.return;
let streamRecord: SubsequentResultRecord | CancellableStreamRecord;
let streamRecord: AsyncStreamRecord | CancellableStreamRecord;
if (returnFn === undefined) {
streamRecord = {
label: streamUsage.label,
path,
} as SubsequentResultRecord;
waterMark: 0,
resume: undefined,
};
} else {
streamRecord = {
label: streamUsage.label,
path,
waterMark: 0,
resume: undefined,
earlyReturn: returnFn.bind(asyncIterator),
};
if (exeContext.cancellableStreams === undefined) {
Expand Down Expand Up @@ -2317,7 +2327,7 @@ function prependNextResolvedStreamItems(
}

function firstAsyncStreamItems(
streamRecord: SubsequentResultRecord,
streamRecord: AsyncStreamRecord,
path: Path,
initialIndex: number,
asyncIterator: AsyncIterator<unknown>,
Expand All @@ -2343,7 +2353,7 @@ function firstAsyncStreamItems(
}

async function getNextAsyncStreamItemsResult(
streamRecord: SubsequentResultRecord,
streamRecord: AsyncStreamRecord,
path: Path,
index: number,
asyncIterator: AsyncIterator<unknown>,
Expand All @@ -2353,6 +2363,18 @@ async function getNextAsyncStreamItemsResult(
itemType: GraphQLOutputType,
): Promise<StreamItemsResult> {
let iteration;

const waterMark = streamRecord.waterMark;

if (waterMark === exeContext.streamHighWaterMark) {
// promiseWithResolvers uses void only as a generic type parameter
// see: https://typescript-eslint.io/rules/no-invalid-void-type/
// eslint-disable-next-line @typescript-eslint/no-invalid-void-type
const { promise: resumed, resolve: resume } = promiseWithResolvers<void>();
streamRecord.resume = resume;
await resumed;
}

try {
iteration = await asyncIterator.next();
} catch (error) {
Expand All @@ -2366,6 +2388,8 @@ async function getNextAsyncStreamItemsResult(
return { streamRecord };
}

streamRecord.waterMark++;

const itemPath = addPath(path, index, undefined);

const result = completeStreamItems(
Expand Down
Loading