Skip to content

feat: support hooking into execute stream (defer/stream) #429

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/six-crabs-flash.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@envelop/execute-subscription-event': patch
---

initial release
71 changes: 38 additions & 33 deletions packages/core/src/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@ import {
SubscribeFunction,
OnSubscribeResultResultOnNextHook,
OnSubscribeResultResultOnEndHook,
OnExecuteDoneHookResultOnNextHook,
OnExecuteDoneHookResultOnEndHook,
ExecuteFunction,
AsyncIterableIteratorOrValue,
} from '@envelop/types';
import isAsyncIterable from 'graphql/jsutils/isAsyncIterable';
import {
DocumentNode,
execute,
ExecutionArgs,
ExecutionResult,
GraphQLError,
GraphQLFieldResolver,
GraphQLSchema,
GraphQLTypeResolver,
parse,
specifiedRules,
subscribe,
Expand All @@ -38,7 +39,7 @@ import {
} from 'graphql';
import { Maybe } from 'graphql/jsutils/Maybe';
import { prepareTracedSchema, resolversHooksSymbol } from './traced-schema';
import { finalAsyncIterator, makeSubscribe, mapAsyncIterator } from './utils';
import { finalAsyncIterator, makeExecute, makeSubscribe, mapAsyncIterator } from './utils';

export type EnvelopOrchestrator<
InitialContext extends ArbitraryObject = ArbitraryObject,
Expand Down Expand Up @@ -353,33 +354,10 @@ export function createEnvelopOrchestrator<PluginsContext = any>(plugins: Plugin[
});

const customExecute = beforeCallbacks.execute.length
? async (
argsOrSchema: ExecutionArgs | GraphQLSchema,
document?: DocumentNode,
rootValue?: any,
contextValue?: any,
variableValues?: Maybe<{ [key: string]: any }>,
operationName?: Maybe<string>,
fieldResolver?: Maybe<GraphQLFieldResolver<any, any>>,
typeResolver?: Maybe<GraphQLTypeResolver<any, any>>
) => {
const args: ExecutionArgs =
argsOrSchema instanceof GraphQLSchema
? {
schema: argsOrSchema,
document: document!,
rootValue,
contextValue,
variableValues,
operationName,
fieldResolver,
typeResolver,
}
: argsOrSchema;

? makeExecute(async args => {
const onResolversHandlers: OnResolverCalledHook[] = [];
let executeFn: typeof execute = execute;
let result: ExecutionResult;
let executeFn = execute as ExecuteFunction;
let result: AsyncIterableIteratorOrValue<ExecutionResult>;

const afterCalls: OnExecuteDoneHook[] = [];
let context = args.contextValue || {};
Expand Down Expand Up @@ -436,17 +414,44 @@ export function createEnvelopOrchestrator<PluginsContext = any>(plugins: Plugin[
contextValue: context,
});

const onNextHandler: OnExecuteDoneHookResultOnNextHook[] = [];
const onEndHandler: OnExecuteDoneHookResultOnEndHook[] = [];

for (const afterCb of afterCalls) {
afterCb({
const hookResult = afterCb({
result,
setResult: newResult => {
result = newResult;
},
});
if (hookResult) {
if (hookResult.onNext) {
onNextHandler.push(hookResult.onNext);
}
if (hookResult.onEnd) {
onEndHandler.push(hookResult.onEnd);
}
}
}

if (onNextHandler.length && isAsyncIterable(result)) {
result = mapAsyncIterator(result, async result => {
for (const onNext of onNextHandler) {
await onNext({ result, setResult: newResult => (result = newResult) });
}
return result;
});
}
if (onEndHandler.length && isAsyncIterable(result)) {
result = finalAsyncIterator(result, () => {
for (const onEnd of onEndHandler) {
onEnd();
}
});
}

return result;
}
})
: execute;

initDone = true;
Expand All @@ -470,7 +475,7 @@ export function createEnvelopOrchestrator<PluginsContext = any>(plugins: Plugin[
init,
parse: customParse,
validate: customValidate,
execute: customExecute,
execute: customExecute as ExecuteFunction,
subscribe: customSubscribe,
contextFactory: customContextFactory,
};
Expand Down
26 changes: 25 additions & 1 deletion packages/core/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import {
Source,
ExecutionResult,
SubscriptionArgs,
ExecutionArgs,
} from 'graphql';
import { PolymorphicSubscribeArguments } from '@envelop/types';
import { AsyncIterableIteratorOrValue, PolymorphicExecuteArguments, PolymorphicSubscribeArguments } from '@envelop/types';
import { PromiseOrValue } from 'graphql/jsutils/PromiseOrValue';

export const envelopIsIntrospectionSymbol = Symbol('ENVELOP_IS_INTROSPECTION');
Expand Down Expand Up @@ -79,6 +80,29 @@ export async function* mapAsyncIterator<TInput, TOutput = TInput>(
}
}

export function getExecuteArgs(args: PolymorphicExecuteArguments): ExecutionArgs {
return args.length === 1
? args[0]
: {
schema: args[0],
document: args[1],
rootValue: args[2],
contextValue: args[3],
variableValues: args[4],
operationName: args[5],
fieldResolver: args[6],
typeResolver: args[7],
};
}

/**
* Utility function for making a execute function that handles polymorphic arguments.
*/
export const makeExecute =
(executeFn: (args: ExecutionArgs) => PromiseOrValue<AsyncIterableIteratorOrValue<ExecutionResult>>) =>
(...polyArgs: PolymorphicExecuteArguments): PromiseOrValue<AsyncIterableIteratorOrValue<ExecutionResult>> =>
executeFn(getExecuteArgs(polyArgs));

export async function* finalAsyncIterator<TInput>(
asyncIterable: AsyncIterableIterator<TInput>,
onFinal: () => void
Expand Down
20 changes: 20 additions & 0 deletions packages/core/test/common.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { makeExecutableSchema } from '@graphql-tools/schema';
import { mapAsyncIterator } from '../src';

export const schema = makeExecutableSchema({
typeDefs: /* GraphQL */ `
type Query {
me: User!
alphabet: [String]!
}
type User {
id: ID!
Expand All @@ -12,6 +14,7 @@ export const schema = makeExecutableSchema({

type Subscription {
alphabet: String!
message: String!
}
`,
resolvers: {
Expand All @@ -20,6 +23,17 @@ export const schema = makeExecutableSchema({
return { _id: 1, firstName: 'Dotan', lastName: 'Simha' };
},
},
Subscription: {
message: {
subscribe: (_, __, context) => {
if (!context || 'subscribeSource' in context === false) {
throw new Error('No subscribeSource provided for context :(');
}
return context.subscribeSource;
},
resolve: (_, __, context) => context.message,
},
},
User: {
id: u => u._id,
name: u => `${u.firstName} ${u.lastName}`,
Expand All @@ -35,3 +49,9 @@ export const query = /* GraphQL */ `
}
}
`;

export const subscriptionOperationString = /* GraphQL */ `
subscription {
message
}
`;
134 changes: 132 additions & 2 deletions packages/core/test/execute.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createSpiedPlugin, createTestkit } from '@envelop/testing';
import { execute, GraphQLSchema } from 'graphql';
import { assertStreamExecutionValue, collectAsyncIteratorValues, createSpiedPlugin, createTestkit } from '@envelop/testing';
import { execute, ExecutionResult, GraphQLSchema } from 'graphql';
import { schema, query } from './common';

describe('execute', () => {
Expand Down Expand Up @@ -138,4 +138,134 @@ describe('execute', () => {
setResult: expect.any(Function),
});
});

it('Should be able to manipulate streams', async () => {
const streamExecuteFn = async function* () {
for (const value of ['a', 'b', 'c', 'd']) {
yield { data: { alphabet: value } };
}
};

const teskit = createTestkit(
[
{
onExecute({ setExecuteFn }) {
setExecuteFn(streamExecuteFn);

return {
onExecuteDone: () => {
return {
onNext: ({ setResult }) => {
setResult({ data: { alphabet: 'x' } });
},
};
},
};
},
},
],
schema
);

const result = await teskit.execute(/* GraphQL */ `
query {
alphabet
}
`);
assertStreamExecutionValue(result);
const values = await collectAsyncIteratorValues(result);
expect(values).toEqual([
{ data: { alphabet: 'x' } },
{ data: { alphabet: 'x' } },
{ data: { alphabet: 'x' } },
{ data: { alphabet: 'x' } },
]);
});

it('Should be able to invoke something after the stream has ended.', async () => {
expect.assertions(1);
const streamExecuteFn = async function* () {
for (const value of ['a', 'b', 'c', 'd']) {
yield { data: { alphabet: value } };
}
};

const teskit = createTestkit(
[
{
onExecute({ setExecuteFn }) {
setExecuteFn(streamExecuteFn);

return {
onExecuteDone: () => {
let latestResult: ExecutionResult;
return {
onNext: ({ result }) => {
latestResult = result;
},
onEnd: () => {
expect(latestResult).toEqual({ data: { alphabet: 'd' } });
},
};
},
};
},
},
],
schema
);

const result = await teskit.execute(/* GraphQL */ `
query {
alphabet
}
`);
assertStreamExecutionValue(result);
// run AsyncGenerator
await collectAsyncIteratorValues(result);
});

it('Should be able to invoke something after the stream has ended (manual return).', async () => {
expect.assertions(1);
const streamExecuteFn = async function* () {
for (const value of ['a', 'b', 'c', 'd']) {
yield { data: { alphabet: value } };
}
};

const teskit = createTestkit(
[
{
onExecute({ setExecuteFn }) {
setExecuteFn(streamExecuteFn);

return {
onExecuteDone: () => {
let latestResult: ExecutionResult;
return {
onNext: ({ result }) => {
latestResult = result;
},
onEnd: () => {
expect(latestResult).toEqual({ data: { alphabet: 'a' } });
},
};
},
};
},
},
],
schema
);

const result = await teskit.execute(/* GraphQL */ `
query {
alphabet
}
`);
assertStreamExecutionValue(result);
const instance = result[Symbol.asyncIterator]();
await instance.next();
await instance.return!();
});
});
Loading