Skip to content

Commit e70b79f

Browse files
committed
the IncrementalPublisher should handle response building
1 parent fae5da5 commit e70b79f

File tree

10 files changed

+196
-215
lines changed

10 files changed

+196
-215
lines changed

src/execution/IncrementalPublisher.ts

Lines changed: 164 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,63 @@ import type {
88
GraphQLFormattedError,
99
} from '../error/GraphQLError.js';
1010

11+
/**
12+
* The result of GraphQL execution.
13+
*
14+
* - `errors` is included when any errors occurred as a non-empty array.
15+
* - `data` is the result of a successful execution of the query.
16+
* - `hasNext` is true if a future payload is expected.
17+
* - `extensions` is reserved for adding non-standard properties.
18+
* - `incremental` is a list of the results from defer/stream directives.
19+
*/
20+
export interface ExecutionResult<
21+
TData = ObjMap<unknown>,
22+
TExtensions = ObjMap<unknown>,
23+
> {
24+
errors?: ReadonlyArray<GraphQLError>;
25+
data?: TData | null;
26+
extensions?: TExtensions;
27+
}
28+
29+
export interface FormattedExecutionResult<
30+
TData = ObjMap<unknown>,
31+
TExtensions = ObjMap<unknown>,
32+
> {
33+
errors?: ReadonlyArray<GraphQLFormattedError>;
34+
data?: TData | null;
35+
extensions?: TExtensions;
36+
}
37+
38+
export interface ExperimentalIncrementalExecutionResults<
39+
TData = ObjMap<unknown>,
40+
TExtensions = ObjMap<unknown>,
41+
> {
42+
initialResult: InitialIncrementalExecutionResult<TData, TExtensions>;
43+
subsequentResults: AsyncGenerator<
44+
SubsequentIncrementalExecutionResult<TData, TExtensions>,
45+
void,
46+
void
47+
>;
48+
}
49+
50+
export interface InitialIncrementalExecutionResult<
51+
TData = ObjMap<unknown>,
52+
TExtensions = ObjMap<unknown>,
53+
> extends ExecutionResult<TData, TExtensions> {
54+
hasNext: boolean;
55+
incremental?: ReadonlyArray<IncrementalResult<TData, TExtensions>>;
56+
extensions?: TExtensions;
57+
}
58+
59+
export interface FormattedInitialIncrementalExecutionResult<
60+
TData = ObjMap<unknown>,
61+
TExtensions = ObjMap<unknown>,
62+
> extends FormattedExecutionResult<TData, TExtensions> {
63+
hasNext: boolean;
64+
incremental?: ReadonlyArray<FormattedIncrementalResult<TData, TExtensions>>;
65+
extensions?: TExtensions;
66+
}
67+
1168
export interface SubsequentIncrementalExecutionResult<
1269
TData = ObjMap<unknown>,
1370
TExtensions = ObjMap<unknown>,
@@ -113,86 +170,6 @@ export class IncrementalPublisher {
113170
this._reset();
114171
}
115172

116-
hasNext(): boolean {
117-
return this._pending.size > 0;
118-
}
119-
120-
subscribe(): AsyncGenerator<
121-
SubsequentIncrementalExecutionResult,
122-
void,
123-
void
124-
> {
125-
let isDone = false;
126-
127-
const _next = async (): Promise<
128-
IteratorResult<SubsequentIncrementalExecutionResult, void>
129-
> => {
130-
// eslint-disable-next-line no-constant-condition
131-
while (true) {
132-
if (isDone) {
133-
return { value: undefined, done: true };
134-
}
135-
136-
for (const item of this._released) {
137-
this._pending.delete(item);
138-
}
139-
const released = this._released;
140-
this._released = new Set();
141-
142-
const result = this._getIncrementalResult(released);
143-
144-
if (!this.hasNext()) {
145-
isDone = true;
146-
}
147-
148-
if (result !== undefined) {
149-
return { value: result, done: false };
150-
}
151-
152-
// eslint-disable-next-line no-await-in-loop
153-
await this._signalled;
154-
}
155-
};
156-
157-
const returnStreamIterators = async (): Promise<void> => {
158-
const promises: Array<Promise<IteratorResult<unknown>>> = [];
159-
this._pending.forEach((incrementalDataRecord) => {
160-
if (
161-
isStreamItemsRecord(incrementalDataRecord) &&
162-
incrementalDataRecord.asyncIterator?.return
163-
) {
164-
promises.push(incrementalDataRecord.asyncIterator.return());
165-
}
166-
});
167-
await Promise.all(promises);
168-
};
169-
170-
const _return = async (): Promise<
171-
IteratorResult<SubsequentIncrementalExecutionResult, void>
172-
> => {
173-
isDone = true;
174-
await returnStreamIterators();
175-
return { value: undefined, done: true };
176-
};
177-
178-
const _throw = async (
179-
error?: unknown,
180-
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> => {
181-
isDone = true;
182-
await returnStreamIterators();
183-
return Promise.reject(error);
184-
};
185-
186-
return {
187-
[Symbol.asyncIterator]() {
188-
return this;
189-
},
190-
next: _next,
191-
return: _return,
192-
throw: _throw,
193-
};
194-
}
195-
196173
prepareInitialResultRecord(): InitialResultRecord {
197174
return {
198175
errors: [],
@@ -256,19 +233,38 @@ export class IncrementalPublisher {
256233
incrementalDataRecord.errors.push(error);
257234
}
258235

259-
publishInitial(initialResult: InitialResultRecord) {
260-
for (const child of initialResult.children) {
236+
buildDataResponse(
237+
initialResultRecord: InitialResultRecord,
238+
data: ObjMap<unknown> | null,
239+
): ExecutionResult | ExperimentalIncrementalExecutionResults {
240+
for (const child of initialResultRecord.children) {
261241
if (child.filtered) {
262242
continue;
263243
}
264244
this._publish(child);
265245
}
246+
247+
const errors = initialResultRecord.errors;
248+
const initialResult = errors.length === 0 ? { data } : { errors, data };
249+
if (this._hasNext()) {
250+
return {
251+
initialResult: {
252+
...initialResult,
253+
hasNext: true,
254+
},
255+
subsequentResults: this._subscribe(),
256+
};
257+
}
258+
return initialResult;
266259
}
267260

268-
getInitialErrors(
269-
initialResult: InitialResultRecord,
270-
): ReadonlyArray<GraphQLError> {
271-
return initialResult.errors;
261+
buildErrorResponse(
262+
initialResultRecord: InitialResultRecord,
263+
error: GraphQLError,
264+
): ExecutionResult {
265+
const errors = initialResultRecord.errors;
266+
errors.push(error);
267+
return { data: null, errors };
272268
}
273269

274270
filter(nullPath: Path, erroringIncrementalDataRecord: IncrementalDataRecord) {
@@ -301,6 +297,86 @@ export class IncrementalPublisher {
301297
});
302298
}
303299

300+
private _hasNext(): boolean {
301+
return this._pending.size > 0;
302+
}
303+
304+
private _subscribe(): AsyncGenerator<
305+
SubsequentIncrementalExecutionResult,
306+
void,
307+
void
308+
> {
309+
let isDone = false;
310+
311+
const _next = async (): Promise<
312+
IteratorResult<SubsequentIncrementalExecutionResult, void>
313+
> => {
314+
// eslint-disable-next-line no-constant-condition
315+
while (true) {
316+
if (isDone) {
317+
return { value: undefined, done: true };
318+
}
319+
320+
for (const item of this._released) {
321+
this._pending.delete(item);
322+
}
323+
const released = this._released;
324+
this._released = new Set();
325+
326+
const result = this._getIncrementalResult(released);
327+
328+
if (!this._hasNext()) {
329+
isDone = true;
330+
}
331+
332+
if (result !== undefined) {
333+
return { value: result, done: false };
334+
}
335+
336+
// eslint-disable-next-line no-await-in-loop
337+
await this._signalled;
338+
}
339+
};
340+
341+
const returnStreamIterators = async (): Promise<void> => {
342+
const promises: Array<Promise<IteratorResult<unknown>>> = [];
343+
this._pending.forEach((incrementalDataRecord) => {
344+
if (
345+
isStreamItemsRecord(incrementalDataRecord) &&
346+
incrementalDataRecord.asyncIterator?.return
347+
) {
348+
promises.push(incrementalDataRecord.asyncIterator.return());
349+
}
350+
});
351+
await Promise.all(promises);
352+
};
353+
354+
const _return = async (): Promise<
355+
IteratorResult<SubsequentIncrementalExecutionResult, void>
356+
> => {
357+
isDone = true;
358+
await returnStreamIterators();
359+
return { value: undefined, done: true };
360+
};
361+
362+
const _throw = async (
363+
error?: unknown,
364+
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> => {
365+
isDone = true;
366+
await returnStreamIterators();
367+
return Promise.reject(error);
368+
};
369+
370+
return {
371+
[Symbol.asyncIterator]() {
372+
return this;
373+
},
374+
next: _next,
375+
return: _return,
376+
throw: _throw,
377+
};
378+
}
379+
304380
private _trigger() {
305381
this._resolve();
306382
this._reset();
@@ -369,8 +445,8 @@ export class IncrementalPublisher {
369445
}
370446

371447
return incrementalResults.length
372-
? { incremental: incrementalResults, hasNext: this.hasNext() }
373-
: encounteredCompletedAsyncIterator && !this.hasNext()
448+
? { incremental: incrementalResults, hasNext: this._hasNext() }
449+
: encounteredCompletedAsyncIterator && !this._hasNext()
374450
? { hasNext: false }
375451
: undefined;
376452
}

src/execution/__tests__/defer-test.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@ import {
1616
import { GraphQLID, GraphQLString } from '../../type/scalars.js';
1717
import { GraphQLSchema } from '../../type/schema.js';
1818

19-
import type { InitialIncrementalExecutionResult } from '../execute.js';
2019
import { execute, experimentalExecuteIncrementally } from '../execute.js';
21-
import type { SubsequentIncrementalExecutionResult } from '../IncrementalPublisher.js';
20+
import type {
21+
InitialIncrementalExecutionResult,
22+
SubsequentIncrementalExecutionResult,
23+
} from '../IncrementalPublisher.js';
2224

2325
const friendType = new GraphQLObjectType({
2426
fields: {

src/execution/__tests__/lists-test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ import { GraphQLSchema } from '../../type/schema.js';
1818

1919
import { buildSchema } from '../../utilities/buildASTSchema.js';
2020

21-
import type { ExecutionResult } from '../execute.js';
2221
import { execute, executeSync } from '../execute.js';
22+
import type { ExecutionResult } from '../IncrementalPublisher.js';
2323

2424
describe('Execute: Accepts any iterable as list value', () => {
2525
function complete(rootValue: unknown) {

src/execution/__tests__/nonnull-test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ import { GraphQLSchema } from '../../type/schema.js';
1313

1414
import { buildSchema } from '../../utilities/buildASTSchema.js';
1515

16-
import type { ExecutionResult } from '../execute.js';
1716
import { execute, executeSync } from '../execute.js';
17+
import type { ExecutionResult } from '../IncrementalPublisher.js';
1818

1919
const syncError = new Error('sync');
2020
const syncNonNullError = new Error('syncNonNull');

src/execution/__tests__/oneof-test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import { parse } from '../../language/parser.js';
66

77
import { buildSchema } from '../../utilities/buildASTSchema.js';
88

9-
import type { ExecutionResult } from '../execute.js';
109
import { execute } from '../execute.js';
10+
import type { ExecutionResult } from '../IncrementalPublisher.js';
1111

1212
const schema = buildSchema(`
1313
type Query {

src/execution/__tests__/stream-test.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ import {
1717
import { GraphQLID, GraphQLString } from '../../type/scalars.js';
1818
import { GraphQLSchema } from '../../type/schema.js';
1919

20-
import type { InitialIncrementalExecutionResult } from '../execute.js';
2120
import { experimentalExecuteIncrementally } from '../execute.js';
22-
import type { SubsequentIncrementalExecutionResult } from '../IncrementalPublisher.js';
21+
import type {
22+
InitialIncrementalExecutionResult,
23+
SubsequentIncrementalExecutionResult,
24+
} from '../IncrementalPublisher.js';
2325

2426
const friendType = new GraphQLObjectType({
2527
fields: {

src/execution/__tests__/subscribe-test.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ import {
2020
} from '../../type/scalars.js';
2121
import { GraphQLSchema } from '../../type/schema.js';
2222

23-
import type { ExecutionArgs, ExecutionResult } from '../execute.js';
23+
import type { ExecutionArgs } from '../execute.js';
2424
import { createSourceEventStream, subscribe } from '../execute.js';
25+
import type { ExecutionResult } from '../IncrementalPublisher.js';
2526

2627
import { SimplePubSub } from './simplePubSub.js';
2728

0 commit comments

Comments
 (0)