Skip to content

feat: execute/subscribe AsyncIterable API #211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 75 additions & 12 deletions packages/core/src/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ import {
AfterCallback,
AfterResolverPayload,
Envelop,
ExecuteDoneOptions,
ExecuteFunction,
OnExecuteHookResult,
OnExecutionDoneHookResult,
OnResolverCalledHooks,
OnSubscribeHookResult,
Plugin,
SubscribeFunction,
} from '@envelop/types';
import { makeSubscribe, makeExecute } from './util';
import { makeSubscribe, makeExecute, mapAsyncIterator, finalAsyncIterator } from './util';
import isAsyncIterable from 'graphql/jsutils/isAsyncIterable';

const trackedSchemaSymbol = Symbol('TRACKED_SCHEMA');
export const resolversHooksSymbol = Symbol('RESOLVERS_HOOKS');
Expand Down Expand Up @@ -215,10 +220,7 @@ export function envelop({ plugins }: { plugins: Plugin[] }): Envelop {
const onResolversHandlers: OnResolverCalledHooks[] = [];
let subscribeFn = subscribe as SubscribeFunction;

const afterCalls: ((options: {
result: AsyncIterableIterator<ExecutionResult> | ExecutionResult;
setResult: (newResult: AsyncIterableIterator<ExecutionResult> | ExecutionResult) => void;
}) => void)[] = [];
const afterCalls: Exclude<OnSubscribeHookResult['onSubscribeResult'], void>[] = [];
let context = args.contextValue;

for (const onSubscribe of onSubscribeCbs) {
Expand Down Expand Up @@ -261,13 +263,44 @@ export function envelop({ plugins }: { plugins: Plugin[] }): Envelop {
contextValue: context,
});

const onNextHandler: Exclude<OnExecutionDoneHookResult['onNext'], void>[] = [];
const onEndHandler: Exclude<OnExecutionDoneHookResult['onEnd'], void>[] = [];

for (const afterCb of afterCalls) {
afterCb({
const streamHandler = afterCb({
result,
setResult: newResult => {
result = newResult;
},
});
isStream: isAsyncIterable(result),
} as ExecuteDoneOptions);

if (streamHandler) {
if (streamHandler.onNext) {
onNextHandler.push(streamHandler.onNext);
}
if (streamHandler.onEnd) {
onEndHandler.push(streamHandler.onEnd);
}
}
}

if (isAsyncIterable(result)) {
if (onNextHandler.length) {
result = mapAsyncIterator(result, result => {
for (const onNext of onNextHandler) {
onNext({ result, setResult: newResult => (result = newResult) });
}
return result;
});
}
if (onEndHandler.length) {
result = finalAsyncIterator(result, () => {
for (const onEnd of onEndHandler) {
onEnd();
}
});
}
}

return result;
Expand All @@ -278,10 +311,9 @@ export function envelop({ plugins }: { plugins: Plugin[] }): Envelop {
? makeExecute(async args => {
const onResolversHandlers: OnResolverCalledHooks[] = [];
let executeFn: ExecuteFunction = execute as ExecuteFunction;
let result: ExecutionResult;
let result: ExecutionResult | AsyncIterableIterator<ExecutionResult>;

const afterCalls: ((options: { result: ExecutionResult; setResult: (newResult: ExecutionResult) => void }) => void)[] =
[];
const afterCalls: Exclude<OnExecuteHookResult['onExecuteDone'], void>[] = [];
let context = args.contextValue;

for (const onExecute of onExecuteCbs) {
Expand Down Expand Up @@ -336,13 +368,44 @@ export function envelop({ plugins }: { plugins: Plugin[] }): Envelop {
contextValue: context,
});

const onNextHandler: Exclude<OnExecutionDoneHookResult['onNext'], void>[] = [];
const onEndHandler: Exclude<OnExecutionDoneHookResult['onEnd'], void>[] = [];

for (const afterCb of afterCalls) {
afterCb({
const streamHandler = afterCb({
result,
setResult: newResult => {
result = newResult;
},
});
isStream: isAsyncIterable(result),
} as ExecuteDoneOptions);

if (streamHandler) {
if (streamHandler.onNext) {
onNextHandler.push(streamHandler.onNext);
}
if (streamHandler.onEnd) {
onEndHandler.push(streamHandler.onEnd);
}
}
}

if (isAsyncIterable(result)) {
if (onNextHandler.length) {
result = mapAsyncIterator(result, result => {
for (const onNext of onNextHandler) {
onNext({ result, setResult: newResult => (result = newResult) });
}
return result;
});
}
if (onEndHandler.length) {
result = finalAsyncIterator(result, () => {
for (const onEnd of onEndHandler) {
onEnd();
}
});
}
}

return result;
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/graphql-typings.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
declare module 'graphql/jsutils/isAsyncIterable' {
function isAsyncIterable(input: unknown): input is AsyncIterable<any>;
function isAsyncIterable(input: unknown): input is AsyncIterableIterator<unknown>;
export default isAsyncIterable;
}
24 changes: 22 additions & 2 deletions packages/core/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ export function getExecuteArgs(args: PolymorphicExecuteArguments): ExecutionArgs
* Utility function for making a execute function that handles polymorphic arguments.
*/
export const makeExecute =
(executeFn: (args: ExecutionArgs) => PromiseOrValue<ExecutionResult>) =>
(...polyArgs: PolymorphicExecuteArguments): PromiseOrValue<ExecutionResult> =>
(executeFn: (args: ExecutionArgs) => PromiseOrValue<ExecutionResult | AsyncIterableIterator<ExecutionResult>>) =>
(...polyArgs: PolymorphicExecuteArguments): PromiseOrValue<ExecutionResult | AsyncIterableIterator<ExecutionResult>> =>
executeFn(getExecuteArgs(polyArgs));

export function getSubscribeArgs(args: PolymorphicSubscribeArguments): SubscriptionArgs {
Expand All @@ -47,3 +47,23 @@ export const makeSubscribe =
(subscribeFn: (args: SubscriptionArgs) => PromiseOrValue<AsyncIterableIterator<ExecutionResult> | ExecutionResult>) =>
(...polyArgs: PolymorphicSubscribeArguments): PromiseOrValue<AsyncIterableIterator<ExecutionResult> | ExecutionResult> =>
subscribeFn(getSubscribeArgs(polyArgs));

export async function* mapAsyncIterator<TInput, TOutput = TInput>(
asyncIterable: AsyncIterableIterator<TInput>,
map: (input: TInput) => Promise<TOutput> | TOutput
): AsyncIterableIterator<TOutput> {
for await (const value of asyncIterable) {
yield map(value);
}
}

export async function* finalAsyncIterator<TInput>(
asyncIterable: AsyncIterableIterator<TInput>,
onFinal: () => void
): AsyncIterableIterator<TInput> {
try {
yield* asyncIterable;
} finally {
onFinal();
}
}
19 changes: 17 additions & 2 deletions packages/core/test/common.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { EventEmitter, on } from 'events';
import { GraphQLID, GraphQLNonNull, GraphQLObjectType, GraphQLSchema, GraphQLString } from 'graphql';
import isAsyncIterable from 'graphql/jsutils/isAsyncIterable';

const createPubSub = <TTopicPayload extends { [key: string]: unknown }>(emitter: EventEmitter) => {
return {
publish: <TTopic extends Extract<keyof TTopicPayload, string>>(topic: TTopic, payload: TTopicPayload[TTopic]) => {
emitter.emit(topic as string, payload);
},
subscribe: async function*<TTopic extends Extract<keyof TTopicPayload, string>>(
subscribe: async function* <TTopic extends Extract<keyof TTopicPayload, string>>(
topic: TTopic
): AsyncIterableIterator<TTopicPayload[TTopic]> {
const asyncIterator = on(emitter, topic);
Expand Down Expand Up @@ -52,7 +53,7 @@ const GraphQLSubscription = new GraphQLObjectType({
fields: {
ping: {
type: GraphQLString,
subscribe: async function*() {
subscribe: async function* () {
const stream = pubSub.subscribe('ping');
return yield* stream;
},
Expand All @@ -79,3 +80,17 @@ export const subscription = /* GraphQL */ `
ping
}
`;

export const collectAsyncIteratorValues = async <TType>(asyncIterable: AsyncIterableIterator<TType>): Promise<Array<TType>> => {
const values: Array<TType> = [];
for await (const value of asyncIterable) {
values.push(value);
}
return values;
};

export function assertAsyncIterator(input: unknown): asserts input is AsyncIterableIterator<unknown> {
if (!isAsyncIterable(input)) {
throw new Error('Expected AsyncIterable iterator.');
}
}
124 changes: 122 additions & 2 deletions packages/core/test/execute.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { createSpiedPlugin, createTestkit } from '@envelop/testing';
import { execute, GraphQLSchema } from 'graphql';
import { schema, query } from './common';
import { execute, ExecutionResult } from 'graphql';
import { ExecuteFunction } from 'packages/types/src';
import { schema, query, assertAsyncIterator, collectAsyncIteratorValues } from './common';

describe('execute', () => {
it('Should wrap and trigger events correctly', async () => {
Expand Down Expand Up @@ -31,6 +32,7 @@ describe('execute', () => {
expect(spiedPlugin.spies.afterResolver).toHaveBeenCalledTimes(3);
expect(spiedPlugin.spies.afterExecute).toHaveBeenCalledTimes(1);
expect(spiedPlugin.spies.afterExecute).toHaveBeenCalledWith({
isStream: false,
setResult: expect.any(Function),
result: {
data: {
Expand Down Expand Up @@ -132,4 +134,122 @@ describe('execute', () => {
setResult: expect.any(Function),
});
});

it('Should be able to manipulate streams', async () => {
const streamExecuteFn = async function* () {
for (const value of ['a', 'b', 'c', 'd']) {
yield { data: { alphabet: value } };
}
};

const teskit = createTestkit(
[
{
onExecute({ setExecuteFn }) {
setExecuteFn(streamExecuteFn);

return {
onExecuteDone: () => {
return {
onNext: ({ setResult }) => {
setResult({ data: { alphabet: 'x' } });
},
};
},
};
},
},
],
schema
);

const result: ReturnType<ExecuteFunction> = await teskit.executeRaw({} as any);
assertAsyncIterator(result);
const values = await collectAsyncIteratorValues(result);
expect(values).toEqual([
{ data: { alphabet: 'x' } },
{ data: { alphabet: 'x' } },
{ data: { alphabet: 'x' } },
{ data: { alphabet: 'x' } },
]);
});

it('Should be able to invoke something after the stream has ended.', async () => {
expect.assertions(1);
const streamExecuteFn = async function* () {
for (const value of ['a', 'b', 'c', 'd']) {
yield { data: { alphabet: value } };
}
};

const teskit = createTestkit(
[
{
onExecute({ setExecuteFn }) {
setExecuteFn(streamExecuteFn);

return {
onExecuteDone: () => {
let latestResult: ExecutionResult;
return {
onNext: ({ result }) => {
latestResult = result;
},
onEnd: () => {
expect(latestResult).toEqual({ data: { alphabet: 'd' } });
},
};
},
};
},
},
],
schema
);

const result: ReturnType<ExecuteFunction> = await teskit.executeRaw({} as any);
assertAsyncIterator(result);
// run AsyncGenerator
await collectAsyncIteratorValues(result);
});

it('Should be able to invoke something after the stream has ended (manual return).', async () => {
expect.assertions(1);
const streamExecuteFn = async function* () {
for (const value of ['a', 'b', 'c', 'd']) {
yield { data: { alphabet: value } };
}
};

const teskit = createTestkit(
[
{
onExecute({ setExecuteFn }) {
setExecuteFn(streamExecuteFn);

return {
onExecuteDone: () => {
let latestResult: ExecutionResult;
return {
onNext: ({ result }) => {
latestResult = result;
},
onEnd: () => {
expect(latestResult).toEqual({ data: { alphabet: 'a' } });
},
};
},
};
},
},
],
schema
);

const result: ReturnType<ExecuteFunction> = await teskit.executeRaw({} as any);
assertAsyncIterator(result);
const instance = result[Symbol.asyncIterator]();
await instance.next();
await instance.return!();
});
});
Loading