Skip to content

Commit

Permalink
align return types from execution and subscription
Browse files Browse the repository at this point in the history
with respect to possible promises
  • Loading branch information
yaacovCR committed Jun 7, 2022
1 parent 75d3061 commit 7f214ea
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 45 deletions.
58 changes: 39 additions & 19 deletions src/execution/__tests__/subscribe-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { expectJSON } from '../../__testUtils__/expectJSON';
import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick';

import { isAsyncIterable } from '../../jsutils/isAsyncIterable';
import { isPromise } from '../../jsutils/isPromise';
import type { PromiseOrValue } from '../../jsutils/PromiseOrValue';

import { parse } from '../../language/parser';

Expand Down Expand Up @@ -135,9 +137,6 @@ async function expectPromise(promise: Promise<unknown>) {
}

return {
toReject() {
expect(caughtError).to.be.an.instanceOf(Error);
},
toRejectWith(message: string) {
expect(caughtError).to.be.an.instanceOf(Error);
expect(caughtError).to.have.property('message', message);
Expand All @@ -152,9 +151,9 @@ const DummyQueryType = new GraphQLObjectType({
},
});

async function subscribeWithBadFn(
function subscribeWithBadFn(
subscribeFn: () => unknown,
): Promise<ExecutionResult> {
): PromiseOrValue<ExecutionResult> {
const schema = new GraphQLSchema({
query: DummyQueryType,
subscription: new GraphQLObjectType({
Expand All @@ -165,12 +164,29 @@ async function subscribeWithBadFn(
}),
});
const document = parse('subscription { foo }');
const result = await subscribe({ schema, document });

assert(!isAsyncIterable(result));
expectJSON(await createSourceEventStream(schema, document)).toDeepEqual(
result,
const result = subscribe({ schema, document });

if (isPromise(result)) {
return result.then(async (resolved) => {
const createSourceEventStreamResult = createSourceEventStream(
schema,
document,
);
assert(isPromise(createSourceEventStreamResult));
assert(!isAsyncIterable(resolved));
expectJSON(await createSourceEventStreamResult).toDeepEqual(resolved);
return resolved;
});
}

const createSourceEventStreamResult = createSourceEventStream(
schema,
document,
);
assert(!isPromise(createSourceEventStreamResult));
assert(!isAsyncIterable(result));
expectJSON(createSourceEventStreamResult).toDeepEqual(result);
return result;
}

Expand Down Expand Up @@ -379,24 +395,22 @@ describe('Subscription Initialization Phase', () => {
});

// @ts-expect-error (schema must not be null)
(await expectPromise(subscribe({ schema: null, document }))).toRejectWith(
expect(() => subscribe({ schema: null, document })).to.throw(
'Expected null to be a GraphQL schema.',
);

// @ts-expect-error
(await expectPromise(subscribe({ document }))).toRejectWith(
expect(() => subscribe({ document })).to.throw(
'Expected undefined to be a GraphQL schema.',
);

// @ts-expect-error (document must not be null)
(await expectPromise(subscribe({ schema, document: null }))).toRejectWith(
expect(() => subscribe({ schema, document: null })).to.throw(
'Must provide document.',
);

// @ts-expect-error
(await expectPromise(subscribe({ schema }))).toRejectWith(
'Must provide document.',
);
expect(() => subscribe({ schema })).to.throw('Must provide document.');
});

it('resolves to an error if schema does not support subscriptions', async () => {
Expand Down Expand Up @@ -450,11 +464,17 @@ describe('Subscription Initialization Phase', () => {
});

// @ts-expect-error
(await expectPromise(subscribe({ schema, document: {} }))).toReject();
expect(() => subscribe({ schema, document: {} })).to.throw();
});

it('throws an error if subscribe does not return an iterator', async () => {
(await expectPromise(subscribeWithBadFn(() => 'test'))).toRejectWith(
expect(() => subscribeWithBadFn(() => 'test')).to.throw(
'Subscription field must return Async Iterable. Received: "test".',
);

const result = subscribeWithBadFn(() => Promise.resolve('test'));
assert(isPromise(result));
(await expectPromise(result)).toRejectWith(
'Subscription field must return Async Iterable. Received: "test".',
);
});
Expand All @@ -472,12 +492,12 @@ describe('Subscription Initialization Phase', () => {

expectJSON(
// Returning an error
await subscribeWithBadFn(() => new Error('test error')),
subscribeWithBadFn(() => new Error('test error')),
).toDeepEqual(expectedResult);

expectJSON(
// Throwing an error
await subscribeWithBadFn(() => {
subscribeWithBadFn(() => {
throw new Error('test error');
}),
).toDeepEqual(expectedResult);
Expand Down
118 changes: 92 additions & 26 deletions src/execution/subscribe.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { inspect } from '../jsutils/inspect';
import { isAsyncIterable } from '../jsutils/isAsyncIterable';
import { isPromise } from '../jsutils/isPromise';
import type { Maybe } from '../jsutils/Maybe';
import { addPath, pathToArray } from '../jsutils/Path';
import type { PromiseOrValue } from '../jsutils/PromiseOrValue';

import { GraphQLError } from '../error/GraphQLError';
import { locatedError } from '../error/locatedError';
Expand Down Expand Up @@ -47,9 +49,11 @@ import { getArgumentValues } from './values';
*
* Accepts either an object with named arguments, or individual arguments.
*/
export async function subscribe(
export function subscribe(
args: ExecutionArgs,
): Promise<AsyncGenerator<ExecutionResult, void, void> | ExecutionResult> {
): PromiseOrValue<
AsyncGenerator<ExecutionResult, void, void> | ExecutionResult
> {
const {
schema,
document,
Expand All @@ -61,7 +65,7 @@ export async function subscribe(
subscribeFieldResolver,
} = args;

const resultOrStream = await createSourceEventStream(
const resultOrStream = createSourceEventStream(
schema,
document,
rootValue,
Expand All @@ -71,6 +75,42 @@ export async function subscribe(
subscribeFieldResolver,
);

if (isPromise(resultOrStream)) {
return resultOrStream.then((resolvedResultOrStream) =>
mapSourceToResponse(
schema,
document,
resolvedResultOrStream,
contextValue,
variableValues,
operationName,
fieldResolver,
),
);
}

return mapSourceToResponse(
schema,
document,
resultOrStream,
contextValue,
variableValues,
operationName,
fieldResolver,
);
}

function mapSourceToResponse(
schema: GraphQLSchema,
document: DocumentNode,
resultOrStream: ExecutionResult | AsyncIterable<unknown>,
contextValue?: unknown,
variableValues?: Maybe<{ readonly [variable: string]: unknown }>,
operationName?: Maybe<string>,
fieldResolver?: Maybe<GraphQLFieldResolver<any, any>>,
): PromiseOrValue<
AsyncGenerator<ExecutionResult, void, void> | ExecutionResult
> {
if (!isAsyncIterable(resultOrStream)) {
return resultOrStream;
}
Expand All @@ -81,7 +121,7 @@ export async function subscribe(
// the GraphQL specification. The `execute` function provides the
// "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
// "ExecuteQuery" algorithm, for which `execute` is also used.
const mapSourceToResponse = (payload: unknown) =>
return mapAsyncIterator(resultOrStream, (payload: unknown) =>
execute({
schema,
document,
Expand All @@ -90,10 +130,8 @@ export async function subscribe(
variableValues,
operationName,
fieldResolver,
});

// Map every source value to a ExecutionResult value as described above.
return mapAsyncIterator(resultOrStream, mapSourceToResponse);
}),
);
}

/**
Expand Down Expand Up @@ -124,15 +162,15 @@ export async function subscribe(
* or otherwise separating these two steps. For more on this, see the
* "Supporting Subscriptions at Scale" information in the GraphQL specification.
*/
export async function createSourceEventStream(
export function createSourceEventStream(
schema: GraphQLSchema,
document: DocumentNode,
rootValue?: unknown,
contextValue?: unknown,
variableValues?: Maybe<{ readonly [variable: string]: unknown }>,
operationName?: Maybe<string>,
subscribeFieldResolver?: Maybe<GraphQLFieldResolver<any, any>>,
): Promise<AsyncIterable<unknown> | ExecutionResult> {
): PromiseOrValue<AsyncIterable<unknown> | ExecutionResult> {
// If arguments are missing or incorrectly typed, this is an internal
// developer mistake which should throw an early error.
assertValidExecutionArguments(schema, document, variableValues);
Expand All @@ -155,17 +193,22 @@ export async function createSourceEventStream(
}

try {
const eventStream = await executeSubscription(exeContext);

// Assert field returned an event stream, otherwise yield an error.
if (!isAsyncIterable(eventStream)) {
throw new Error(
'Subscription field must return Async Iterable. ' +
`Received: ${inspect(eventStream)}.`,
);
const eventStream = executeSubscription(exeContext);

if (isPromise(eventStream)) {
return eventStream
.then((resolvedEventStream) => ensureAsyncIterable(resolvedEventStream))
.then(undefined, (error) => {
// If it GraphQLError, report it as an ExecutionResult, containing only errors and no data.
// Otherwise treat the error as a system-class error and re-throw it.
if (error instanceof GraphQLError) {
return { errors: [error] };
}
throw error;
});
}

return eventStream;
return ensureAsyncIterable(eventStream);
} catch (error) {
// If it GraphQLError, report it as an ExecutionResult, containing only errors and no data.
// Otherwise treat the error as a system-class error and re-throw it.
Expand All @@ -176,9 +219,19 @@ export async function createSourceEventStream(
}
}

async function executeSubscription(
exeContext: ExecutionContext,
): Promise<unknown> {
function ensureAsyncIterable(eventStream: unknown): AsyncIterable<unknown> {
// Assert field returned an event stream, otherwise yield an error.
if (!isAsyncIterable(eventStream)) {
throw new Error(
'Subscription field must return Async Iterable. ' +
`Received: ${inspect(eventStream)}.`,
);
}

return eventStream;
}

function executeSubscription(exeContext: ExecutionContext): unknown {
const { schema, fragments, operation, variableValues, rootValue } =
exeContext;

Expand Down Expand Up @@ -233,13 +286,26 @@ async function executeSubscription(
// Call the `subscribe()` resolver or the default resolver to produce an
// AsyncIterable yielding raw payloads.
const resolveFn = fieldDef.subscribe ?? exeContext.subscribeFieldResolver;
const eventStream = await resolveFn(rootValue, args, contextValue, info);

if (eventStream instanceof Error) {
throw eventStream;
const eventStream = resolveFn(rootValue, args, contextValue, info);

if (isPromise(eventStream)) {
return eventStream
.then((resolvedEventStream) => throwReturnedError(resolvedEventStream))
.then(undefined, (error) => {
throw locatedError(error, fieldNodes, pathToArray(path));
});
}
return eventStream;

return throwReturnedError(eventStream);
} catch (error) {
throw locatedError(error, fieldNodes, pathToArray(path));
}
}

function throwReturnedError(eventStream: unknown): unknown {
if (eventStream instanceof Error) {
throw eventStream;
}
return eventStream;
}

0 comments on commit 7f214ea

Please sign in to comment.