Skip to content

Commit 110ea79

Browse files
n1ru4ldotansimha
authored andcommitted
feat: support hooking into execute stream (defer/stream) (#429)
1 parent b27604e commit 110ea79

File tree

13 files changed

+537
-41
lines changed

13 files changed

+537
-41
lines changed

.changeset/six-crabs-flash.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@envelop/execute-subscription-event': patch
3+
---
4+
5+
initial release

packages/core/src/orchestrator.ts

Lines changed: 38 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,18 @@ import {
1919
SubscribeFunction,
2020
OnSubscribeResultResultOnNextHook,
2121
OnSubscribeResultResultOnEndHook,
22+
OnExecuteDoneHookResultOnNextHook,
23+
OnExecuteDoneHookResultOnEndHook,
24+
ExecuteFunction,
25+
AsyncIterableIteratorOrValue,
2226
} from '@envelop/types';
2327
import isAsyncIterable from 'graphql/jsutils/isAsyncIterable';
2428
import {
2529
DocumentNode,
2630
execute,
27-
ExecutionArgs,
2831
ExecutionResult,
2932
GraphQLError,
30-
GraphQLFieldResolver,
3133
GraphQLSchema,
32-
GraphQLTypeResolver,
3334
parse,
3435
specifiedRules,
3536
subscribe,
@@ -38,7 +39,7 @@ import {
3839
} from 'graphql';
3940
import { Maybe } from 'graphql/jsutils/Maybe';
4041
import { prepareTracedSchema, resolversHooksSymbol } from './traced-schema';
41-
import { finalAsyncIterator, makeSubscribe, mapAsyncIterator } from './utils';
42+
import { finalAsyncIterator, makeExecute, makeSubscribe, mapAsyncIterator } from './utils';
4243

4344
export type EnvelopOrchestrator<
4445
InitialContext extends ArbitraryObject = ArbitraryObject,
@@ -353,33 +354,10 @@ export function createEnvelopOrchestrator<PluginsContext = any>(plugins: Plugin[
353354
});
354355

355356
const customExecute = beforeCallbacks.execute.length
356-
? async (
357-
argsOrSchema: ExecutionArgs | GraphQLSchema,
358-
document?: DocumentNode,
359-
rootValue?: any,
360-
contextValue?: any,
361-
variableValues?: Maybe<{ [key: string]: any }>,
362-
operationName?: Maybe<string>,
363-
fieldResolver?: Maybe<GraphQLFieldResolver<any, any>>,
364-
typeResolver?: Maybe<GraphQLTypeResolver<any, any>>
365-
) => {
366-
const args: ExecutionArgs =
367-
argsOrSchema instanceof GraphQLSchema
368-
? {
369-
schema: argsOrSchema,
370-
document: document!,
371-
rootValue,
372-
contextValue,
373-
variableValues,
374-
operationName,
375-
fieldResolver,
376-
typeResolver,
377-
}
378-
: argsOrSchema;
379-
357+
? makeExecute(async args => {
380358
const onResolversHandlers: OnResolverCalledHook[] = [];
381-
let executeFn: typeof execute = execute;
382-
let result: ExecutionResult;
359+
let executeFn = execute as ExecuteFunction;
360+
let result: AsyncIterableIteratorOrValue<ExecutionResult>;
383361

384362
const afterCalls: OnExecuteDoneHook[] = [];
385363
let context = args.contextValue || {};
@@ -436,17 +414,44 @@ export function createEnvelopOrchestrator<PluginsContext = any>(plugins: Plugin[
436414
contextValue: context,
437415
});
438416

417+
const onNextHandler: OnExecuteDoneHookResultOnNextHook[] = [];
418+
const onEndHandler: OnExecuteDoneHookResultOnEndHook[] = [];
419+
439420
for (const afterCb of afterCalls) {
440-
afterCb({
421+
const hookResult = afterCb({
441422
result,
442423
setResult: newResult => {
443424
result = newResult;
444425
},
445426
});
427+
if (hookResult) {
428+
if (hookResult.onNext) {
429+
onNextHandler.push(hookResult.onNext);
430+
}
431+
if (hookResult.onEnd) {
432+
onEndHandler.push(hookResult.onEnd);
433+
}
434+
}
435+
}
436+
437+
if (onNextHandler.length && isAsyncIterable(result)) {
438+
result = mapAsyncIterator(result, async result => {
439+
for (const onNext of onNextHandler) {
440+
await onNext({ result, setResult: newResult => (result = newResult) });
441+
}
442+
return result;
443+
});
444+
}
445+
if (onEndHandler.length && isAsyncIterable(result)) {
446+
result = finalAsyncIterator(result, () => {
447+
for (const onEnd of onEndHandler) {
448+
onEnd();
449+
}
450+
});
446451
}
447452

448453
return result;
449-
}
454+
})
450455
: execute;
451456

452457
initDone = true;
@@ -470,7 +475,7 @@ export function createEnvelopOrchestrator<PluginsContext = any>(plugins: Plugin[
470475
init,
471476
parse: customParse,
472477
validate: customValidate,
473-
execute: customExecute,
478+
execute: customExecute as ExecuteFunction,
474479
subscribe: customSubscribe,
475480
contextFactory: customContextFactory,
476481
};

packages/core/src/utils.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ import {
88
Source,
99
ExecutionResult,
1010
SubscriptionArgs,
11+
ExecutionArgs,
1112
} from 'graphql';
12-
import { PolymorphicSubscribeArguments } from '@envelop/types';
13+
import { AsyncIterableIteratorOrValue, PolymorphicExecuteArguments, PolymorphicSubscribeArguments } from '@envelop/types';
1314
import { PromiseOrValue } from 'graphql/jsutils/PromiseOrValue';
1415

1516
export const envelopIsIntrospectionSymbol = Symbol('ENVELOP_IS_INTROSPECTION');
@@ -79,6 +80,29 @@ export async function* mapAsyncIterator<TInput, TOutput = TInput>(
7980
}
8081
}
8182

83+
export function getExecuteArgs(args: PolymorphicExecuteArguments): ExecutionArgs {
84+
return args.length === 1
85+
? args[0]
86+
: {
87+
schema: args[0],
88+
document: args[1],
89+
rootValue: args[2],
90+
contextValue: args[3],
91+
variableValues: args[4],
92+
operationName: args[5],
93+
fieldResolver: args[6],
94+
typeResolver: args[7],
95+
};
96+
}
97+
98+
/**
99+
* Utility function for making a execute function that handles polymorphic arguments.
100+
*/
101+
export const makeExecute =
102+
(executeFn: (args: ExecutionArgs) => PromiseOrValue<AsyncIterableIteratorOrValue<ExecutionResult>>) =>
103+
(...polyArgs: PolymorphicExecuteArguments): PromiseOrValue<AsyncIterableIteratorOrValue<ExecutionResult>> =>
104+
executeFn(getExecuteArgs(polyArgs));
105+
82106
export async function* finalAsyncIterator<TInput>(
83107
asyncIterable: AsyncIterableIterator<TInput>,
84108
onFinal: () => void

packages/core/test/common.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import { makeExecutableSchema } from '@graphql-tools/schema';
2+
import { mapAsyncIterator } from '../src';
23

34
export const schema = makeExecutableSchema({
45
typeDefs: /* GraphQL */ `
56
type Query {
67
me: User!
8+
alphabet: [String]!
79
}
810
type User {
911
id: ID!
@@ -12,6 +14,7 @@ export const schema = makeExecutableSchema({
1214
1315
type Subscription {
1416
alphabet: String!
17+
message: String!
1518
}
1619
`,
1720
resolvers: {
@@ -20,6 +23,17 @@ export const schema = makeExecutableSchema({
2023
return { _id: 1, firstName: 'Dotan', lastName: 'Simha' };
2124
},
2225
},
26+
Subscription: {
27+
message: {
28+
subscribe: (_, __, context) => {
29+
if (!context || 'subscribeSource' in context === false) {
30+
throw new Error('No subscribeSource provided for context :(');
31+
}
32+
return context.subscribeSource;
33+
},
34+
resolve: (_, __, context) => context.message,
35+
},
36+
},
2337
User: {
2438
id: u => u._id,
2539
name: u => `${u.firstName} ${u.lastName}`,
@@ -35,3 +49,9 @@ export const query = /* GraphQL */ `
3549
}
3650
}
3751
`;
52+
53+
export const subscriptionOperationString = /* GraphQL */ `
54+
subscription {
55+
message
56+
}
57+
`;

packages/core/test/execute.spec.ts

Lines changed: 132 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { createSpiedPlugin, createTestkit } from '@envelop/testing';
2-
import { execute, GraphQLSchema } from 'graphql';
1+
import { assertStreamExecutionValue, collectAsyncIteratorValues, createSpiedPlugin, createTestkit } from '@envelop/testing';
2+
import { execute, ExecutionResult, GraphQLSchema } from 'graphql';
33
import { schema, query } from './common';
44

55
describe('execute', () => {
@@ -138,4 +138,134 @@ describe('execute', () => {
138138
setResult: expect.any(Function),
139139
});
140140
});
141+
142+
it('Should be able to manipulate streams', async () => {
143+
const streamExecuteFn = async function* () {
144+
for (const value of ['a', 'b', 'c', 'd']) {
145+
yield { data: { alphabet: value } };
146+
}
147+
};
148+
149+
const teskit = createTestkit(
150+
[
151+
{
152+
onExecute({ setExecuteFn }) {
153+
setExecuteFn(streamExecuteFn);
154+
155+
return {
156+
onExecuteDone: () => {
157+
return {
158+
onNext: ({ setResult }) => {
159+
setResult({ data: { alphabet: 'x' } });
160+
},
161+
};
162+
},
163+
};
164+
},
165+
},
166+
],
167+
schema
168+
);
169+
170+
const result = await teskit.execute(/* GraphQL */ `
171+
query {
172+
alphabet
173+
}
174+
`);
175+
assertStreamExecutionValue(result);
176+
const values = await collectAsyncIteratorValues(result);
177+
expect(values).toEqual([
178+
{ data: { alphabet: 'x' } },
179+
{ data: { alphabet: 'x' } },
180+
{ data: { alphabet: 'x' } },
181+
{ data: { alphabet: 'x' } },
182+
]);
183+
});
184+
185+
it('Should be able to invoke something after the stream has ended.', async () => {
186+
expect.assertions(1);
187+
const streamExecuteFn = async function* () {
188+
for (const value of ['a', 'b', 'c', 'd']) {
189+
yield { data: { alphabet: value } };
190+
}
191+
};
192+
193+
const teskit = createTestkit(
194+
[
195+
{
196+
onExecute({ setExecuteFn }) {
197+
setExecuteFn(streamExecuteFn);
198+
199+
return {
200+
onExecuteDone: () => {
201+
let latestResult: ExecutionResult;
202+
return {
203+
onNext: ({ result }) => {
204+
latestResult = result;
205+
},
206+
onEnd: () => {
207+
expect(latestResult).toEqual({ data: { alphabet: 'd' } });
208+
},
209+
};
210+
},
211+
};
212+
},
213+
},
214+
],
215+
schema
216+
);
217+
218+
const result = await teskit.execute(/* GraphQL */ `
219+
query {
220+
alphabet
221+
}
222+
`);
223+
assertStreamExecutionValue(result);
224+
// run AsyncGenerator
225+
await collectAsyncIteratorValues(result);
226+
});
227+
228+
it('Should be able to invoke something after the stream has ended (manual return).', async () => {
229+
expect.assertions(1);
230+
const streamExecuteFn = async function* () {
231+
for (const value of ['a', 'b', 'c', 'd']) {
232+
yield { data: { alphabet: value } };
233+
}
234+
};
235+
236+
const teskit = createTestkit(
237+
[
238+
{
239+
onExecute({ setExecuteFn }) {
240+
setExecuteFn(streamExecuteFn);
241+
242+
return {
243+
onExecuteDone: () => {
244+
let latestResult: ExecutionResult;
245+
return {
246+
onNext: ({ result }) => {
247+
latestResult = result;
248+
},
249+
onEnd: () => {
250+
expect(latestResult).toEqual({ data: { alphabet: 'a' } });
251+
},
252+
};
253+
},
254+
};
255+
},
256+
},
257+
],
258+
schema
259+
);
260+
261+
const result = await teskit.execute(/* GraphQL */ `
262+
query {
263+
alphabet
264+
}
265+
`);
266+
assertStreamExecutionValue(result);
267+
const instance = result[Symbol.asyncIterator]();
268+
await instance.next();
269+
await instance.return!();
270+
});
141271
});

0 commit comments

Comments
 (0)