Skip to content

Commit

Permalink
subscribe: stay synchronous when possible (#3620)
Browse files Browse the repository at this point in the history
This change aligns the return types of `execute` and `subscribe` (as well as `createSourceEventStream`) with respect to returning values or promises.

Just as GraphQL execution for queries and mutations stays synchronous when possible, creation of the source event stream and returning the mapped `AsyncGenerator` will now occur synchronously when possible, i.e. when the `subscribe` method for the given subscription root field directly returns an `AsyncIterable`, rather than a Promise<AsyncIterable>.

Specifically, the return types of `subscribe` and `createSourceEventStream` become:
`subscribe(...): PromiseOrValue<AsyncGenerator<ExecutionResult> | ExecutionResult>`
`createSourceEventStream(...): PromiseOrValue<AsyncIterable<unknown> | ExecutionResult>`.

Previously, they were:
`subscribe(...): Promise<AsyncGenerator<ExecutionResult> | ExecutionResult>`).
`createSourceEventStream(...): Promise<AsyncIterable<unknown> | ExecutionResult>`.

Co-authored-by: Ivan Goncharov <ivan.goncharov.ua@gmail.com>
  • Loading branch information
yaacovCR and IvanGoncharov authored Jun 9, 2022
1 parent ea1894a commit 6d42ced
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 76 deletions.
136 changes: 85 additions & 51 deletions src/execution/__tests__/subscribe-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { expectJSON } from '../../__testUtils__/expectJSON';
import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick';

import { isAsyncIterable } from '../../jsutils/isAsyncIterable';
import { isPromise } from '../../jsutils/isPromise';
import type { PromiseOrValue } from '../../jsutils/PromiseOrValue';

import { parse } from '../../language/parser';

Expand Down Expand Up @@ -123,38 +125,59 @@ function createSubscription(pubsub: SimplePubSub<Email>) {
return subscribe({ schema: emailSchema, document, rootValue: data });
}

async function expectPromise(promise: Promise<unknown>) {
let caughtError: Error;

try {
/* c8 ignore next 2 */
await promise;
expect.fail('promise should have thrown but did not');
} catch (error) {
caughtError = error;
}
// TODO: consider adding this method to testUtils (with tests)
function expectPromise(maybePromise: unknown) {
assert(isPromise(maybePromise));

return {
toReject() {
expect(caughtError).to.be.an.instanceOf(Error);
toResolve() {
return maybePromise;
},
toRejectWith(message: string) {
async toRejectWith(message: string) {
let caughtError: Error;

try {
/* c8 ignore next 2 */
await maybePromise;
expect.fail('promise should have thrown but did not');
} catch (error) {
caughtError = error;
}

expect(caughtError).to.be.an.instanceOf(Error);
expect(caughtError).to.have.property('message', message);
},
};
}

// TODO: consider adding this method to testUtils (with tests)
function expectEqualPromisesOrValues<T>(
value1: PromiseOrValue<T>,
value2: PromiseOrValue<T>,
): PromiseOrValue<T> {
if (isPromise(value1)) {
assert(isPromise(value2));
return Promise.all([value1, value2]).then((resolved) => {
expectJSON(resolved[1]).toDeepEqual(resolved[0]);
return resolved[0];
});
}

assert(!isPromise(value2));
expectJSON(value2).toDeepEqual(value1);
return value1;
}

const DummyQueryType = new GraphQLObjectType({
name: 'Query',
fields: {
dummy: { type: GraphQLString },
},
});

async function subscribeWithBadFn(
function subscribeWithBadFn(
subscribeFn: () => unknown,
): Promise<ExecutionResult> {
): PromiseOrValue<ExecutionResult | AsyncIterable<unknown>> {
const schema = new GraphQLSchema({
query: DummyQueryType,
subscription: new GraphQLObjectType({
Expand All @@ -165,13 +188,11 @@ async function subscribeWithBadFn(
}),
});
const document = parse('subscription { foo }');
const result = await subscribe({ schema, document });

assert(!isAsyncIterable(result));
expectJSON(await createSourceEventStream(schema, document)).toDeepEqual(
result,
return expectEqualPromisesOrValues(
subscribe({ schema, document }),
createSourceEventStream(schema, document),
);
return result;
}

/* eslint-disable @typescript-eslint/require-await */
Expand All @@ -193,7 +214,7 @@ describe('Subscription Initialization Phase', () => {
yield { foo: 'FooValue' };
}

const subscription = await subscribe({
const subscription = subscribe({
schema,
document: parse('subscription { foo }'),
rootValue: { foo: fooGenerator },
Expand Down Expand Up @@ -229,7 +250,7 @@ describe('Subscription Initialization Phase', () => {
}),
});

const subscription = await subscribe({
const subscription = subscribe({
schema,
document: parse('subscription { foo }'),
});
Expand Down Expand Up @@ -267,10 +288,13 @@ describe('Subscription Initialization Phase', () => {
}),
});

const subscription = await subscribe({
const promise = subscribe({
schema,
document: parse('subscription { foo }'),
});
assert(isPromise(promise));

const subscription = await promise;
assert(isAsyncIterable(subscription));

expect(await subscription.next()).to.deep.equal({
Expand Down Expand Up @@ -299,7 +323,7 @@ describe('Subscription Initialization Phase', () => {
yield { foo: 'FooValue' };
}

const subscription = await subscribe({
const subscription = subscribe({
schema,
document: parse('subscription { foo }'),
rootValue: { customFoo: fooGenerator },
Expand Down Expand Up @@ -349,7 +373,7 @@ describe('Subscription Initialization Phase', () => {
}),
});

const subscription = await subscribe({
const subscription = subscribe({
schema,
document: parse('subscription { foo bar }'),
});
Expand Down Expand Up @@ -379,31 +403,29 @@ describe('Subscription Initialization Phase', () => {
});

// @ts-expect-error (schema must not be null)
(await expectPromise(subscribe({ schema: null, document }))).toRejectWith(
expect(() => subscribe({ schema: null, document })).to.throw(
'Expected null to be a GraphQL schema.',
);

// @ts-expect-error
(await expectPromise(subscribe({ document }))).toRejectWith(
expect(() => subscribe({ document })).to.throw(
'Expected undefined to be a GraphQL schema.',
);

// @ts-expect-error (document must not be null)
(await expectPromise(subscribe({ schema, document: null }))).toRejectWith(
expect(() => subscribe({ schema, document: null })).to.throw(
'Must provide document.',
);

// @ts-expect-error
(await expectPromise(subscribe({ schema }))).toRejectWith(
'Must provide document.',
);
expect(() => subscribe({ schema })).to.throw('Must provide document.');
});

it('resolves to an error if schema does not support subscriptions', async () => {
const schema = new GraphQLSchema({ query: DummyQueryType });
const document = parse('subscription { unknownField }');

const result = await subscribe({ schema, document });
const result = subscribe({ schema, document });
expectJSON(result).toDeepEqual({
errors: [
{
Expand All @@ -427,7 +449,7 @@ describe('Subscription Initialization Phase', () => {
});
const document = parse('subscription { unknownField }');

const result = await subscribe({ schema, document });
const result = subscribe({ schema, document });
expectJSON(result).toDeepEqual({
errors: [
{
Expand All @@ -450,11 +472,11 @@ describe('Subscription Initialization Phase', () => {
});

// @ts-expect-error
(await expectPromise(subscribe({ schema, document: {} }))).toReject();
expect(() => subscribe({ schema, document: {} })).to.throw();
});

it('throws an error if subscribe does not return an iterator', async () => {
expectJSON(await subscribeWithBadFn(() => 'test')).toDeepEqual({
const expectedResult = {
errors: [
{
message:
Expand All @@ -463,7 +485,15 @@ describe('Subscription Initialization Phase', () => {
path: ['foo'],
},
],
});
};

expectJSON(subscribeWithBadFn(() => 'test')).toDeepEqual(expectedResult);

expectJSON(
await expectPromise(
subscribeWithBadFn(() => Promise.resolve('test')),
).toResolve(),
).toDeepEqual(expectedResult);
});

it('resolves to an error for subscription resolver errors', async () => {
Expand All @@ -479,24 +509,28 @@ describe('Subscription Initialization Phase', () => {

expectJSON(
// Returning an error
await subscribeWithBadFn(() => new Error('test error')),
subscribeWithBadFn(() => new Error('test error')),
).toDeepEqual(expectedResult);

expectJSON(
// Throwing an error
await subscribeWithBadFn(() => {
subscribeWithBadFn(() => {
throw new Error('test error');
}),
).toDeepEqual(expectedResult);

expectJSON(
// Resolving to an error
await subscribeWithBadFn(() => Promise.resolve(new Error('test error'))),
await expectPromise(
subscribeWithBadFn(() => Promise.resolve(new Error('test error'))),
).toResolve(),
).toDeepEqual(expectedResult);

expectJSON(
// Rejecting with an error
await subscribeWithBadFn(() => Promise.reject(new Error('test error'))),
await expectPromise(
subscribeWithBadFn(() => Promise.reject(new Error('test error'))),
).toResolve(),
).toDeepEqual(expectedResult);
});

Expand All @@ -523,7 +557,7 @@ describe('Subscription Initialization Phase', () => {

// If we receive variables that cannot be coerced correctly, subscribe() will
// resolve to an ExecutionResult that contains an informative error description.
const result = await subscribe({ schema, document, variableValues });
const result = subscribe({ schema, document, variableValues });
expectJSON(result).toDeepEqual({
errors: [
{
Expand All @@ -541,10 +575,10 @@ describe('Subscription Publish Phase', () => {
it('produces a payload for multiple subscribe in same subscription', async () => {
const pubsub = new SimplePubSub<Email>();

const subscription = await createSubscription(pubsub);
const subscription = createSubscription(pubsub);
assert(isAsyncIterable(subscription));

const secondSubscription = await createSubscription(pubsub);
const secondSubscription = createSubscription(pubsub);
assert(isAsyncIterable(secondSubscription));

const payload1 = subscription.next();
Expand Down Expand Up @@ -583,7 +617,7 @@ describe('Subscription Publish Phase', () => {

it('produces a payload per subscription event', async () => {
const pubsub = new SimplePubSub<Email>();
const subscription = await createSubscription(pubsub);
const subscription = createSubscription(pubsub);
assert(isAsyncIterable(subscription));

// Wait for the next subscription payload.
Expand Down Expand Up @@ -672,7 +706,7 @@ describe('Subscription Publish Phase', () => {

it('produces a payload when there are multiple events', async () => {
const pubsub = new SimplePubSub<Email>();
const subscription = await createSubscription(pubsub);
const subscription = createSubscription(pubsub);
assert(isAsyncIterable(subscription));

let payload = subscription.next();
Expand Down Expand Up @@ -738,7 +772,7 @@ describe('Subscription Publish Phase', () => {

it('should not trigger when subscription is already done', async () => {
const pubsub = new SimplePubSub<Email>();
const subscription = await createSubscription(pubsub);
const subscription = createSubscription(pubsub);
assert(isAsyncIterable(subscription));

let payload = subscription.next();
Expand Down Expand Up @@ -792,7 +826,7 @@ describe('Subscription Publish Phase', () => {

it('should not trigger when subscription is thrown', async () => {
const pubsub = new SimplePubSub<Email>();
const subscription = await createSubscription(pubsub);
const subscription = createSubscription(pubsub);
assert(isAsyncIterable(subscription));

let payload = subscription.next();
Expand Down Expand Up @@ -845,7 +879,7 @@ describe('Subscription Publish Phase', () => {

it('event order is correct for multiple publishes', async () => {
const pubsub = new SimplePubSub<Email>();
const subscription = await createSubscription(pubsub);
const subscription = createSubscription(pubsub);
assert(isAsyncIterable(subscription));

let payload = subscription.next();
Expand Down Expand Up @@ -936,7 +970,7 @@ describe('Subscription Publish Phase', () => {
});

const document = parse('subscription { newMessage }');
const subscription = await subscribe({ schema, document });
const subscription = subscribe({ schema, document });
assert(isAsyncIterable(subscription));

expect(await subscription.next()).to.deep.equal({
Expand Down Expand Up @@ -997,7 +1031,7 @@ describe('Subscription Publish Phase', () => {
});

const document = parse('subscription { newMessage }');
const subscription = await subscribe({ schema, document });
const subscription = subscribe({ schema, document });
assert(isAsyncIterable(subscription));

expect(await subscription.next()).to.deep.equal({
Expand All @@ -1007,7 +1041,7 @@ describe('Subscription Publish Phase', () => {
},
});

(await expectPromise(subscription.next())).toRejectWith('test error');
await expectPromise(subscription.next()).toRejectWith('test error');

expect(await subscription.next()).to.deep.equal({
done: true,
Expand Down
Loading

0 comments on commit 6d42ced

Please sign in to comment.