Skip to content

Commit f5ebcbe

Browse files
committed
add defer/stream support for subscriptions
1 parent 6c5b85c commit f5ebcbe

File tree

4 files changed

+355
-19
lines changed

4 files changed

+355
-19
lines changed
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
import { expect } from 'chai';
2+
import { describe, it } from 'mocha';
3+
4+
import { flattenAsyncIterator } from '../flattenAsyncIterator';
5+
6+
describe('flattenAsyncIterator', () => {
7+
it('does not modify an already flat async generator', async () => {
8+
async function* source() {
9+
yield await Promise.resolve(1);
10+
yield await Promise.resolve(2);
11+
yield await Promise.resolve(3);
12+
}
13+
14+
const result = flattenAsyncIterator(source());
15+
16+
expect(await result.next()).to.deep.equal({ value: 1, done: false });
17+
expect(await result.next()).to.deep.equal({ value: 2, done: false });
18+
expect(await result.next()).to.deep.equal({ value: 3, done: false });
19+
expect(await result.next()).to.deep.equal({
20+
value: undefined,
21+
done: true,
22+
});
23+
});
24+
25+
it('does not modify an already flat async iterator', async () => {
26+
const items = [1, 2, 3];
27+
28+
const iterator: any = {
29+
[Symbol.asyncIterator]() {
30+
return this;
31+
},
32+
next() {
33+
return Promise.resolve({
34+
done: items.length === 0,
35+
value: items.shift(),
36+
});
37+
},
38+
};
39+
40+
const result = flattenAsyncIterator(iterator);
41+
42+
expect(await result.next()).to.deep.equal({ value: 1, done: false });
43+
expect(await result.next()).to.deep.equal({ value: 2, done: false });
44+
expect(await result.next()).to.deep.equal({ value: 3, done: false });
45+
expect(await result.next()).to.deep.equal({
46+
value: undefined,
47+
done: true,
48+
});
49+
});
50+
51+
it('flatten nested async generators', async () => {
52+
async function* source() {
53+
yield await Promise.resolve(1);
54+
yield await Promise.resolve(2);
55+
yield await Promise.resolve(
56+
(async function* nested(): AsyncGenerator<number, void, void> {
57+
yield await Promise.resolve(2.1);
58+
yield await Promise.resolve(2.2);
59+
})(),
60+
);
61+
yield await Promise.resolve(3);
62+
}
63+
64+
const doubles = flattenAsyncIterator(source());
65+
66+
const result = [];
67+
for await (const x of doubles) {
68+
result.push(x);
69+
}
70+
expect(result).to.deep.equal([1, 2, 2.1, 2.2, 3]);
71+
});
72+
73+
it('allows returning early from a nested async generator', async () => {
74+
async function* source() {
75+
yield await Promise.resolve(1);
76+
yield await Promise.resolve(2);
77+
yield await Promise.resolve(
78+
(async function* nested(): AsyncGenerator<number, void, void> {
79+
yield await Promise.resolve(2.1); /* c8 ignore start */
80+
// Not reachable, early return
81+
yield await Promise.resolve(2.2);
82+
})(),
83+
);
84+
// Not reachable, early return
85+
yield await Promise.resolve(3);
86+
}
87+
/* c8 ignore stop */
88+
89+
const doubles = flattenAsyncIterator(source());
90+
91+
expect(await doubles.next()).to.deep.equal({ value: 1, done: false });
92+
expect(await doubles.next()).to.deep.equal({ value: 2, done: false });
93+
expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false });
94+
95+
// Early return
96+
expect(await doubles.return()).to.deep.equal({
97+
value: undefined,
98+
done: true,
99+
});
100+
101+
// Subsequent next calls
102+
expect(await doubles.next()).to.deep.equal({
103+
value: undefined,
104+
done: true,
105+
});
106+
expect(await doubles.next()).to.deep.equal({
107+
value: undefined,
108+
done: true,
109+
});
110+
});
111+
112+
it('allows throwing errors from a nested async generator', async () => {
113+
async function* source() {
114+
yield await Promise.resolve(1);
115+
yield await Promise.resolve(2);
116+
yield await Promise.resolve(
117+
(async function* nested(): AsyncGenerator<number, void, void> {
118+
yield await Promise.resolve(2.1); /* c8 ignore start */
119+
// Not reachable, early return
120+
yield await Promise.resolve(2.2);
121+
})(),
122+
);
123+
// Not reachable, early return
124+
yield await Promise.resolve(3);
125+
}
126+
/* c8 ignore stop */
127+
128+
const doubles = flattenAsyncIterator(source());
129+
130+
expect(await doubles.next()).to.deep.equal({ value: 1, done: false });
131+
expect(await doubles.next()).to.deep.equal({ value: 2, done: false });
132+
expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false });
133+
134+
// Throw error
135+
let caughtError;
136+
try {
137+
await doubles.throw('ouch'); /* c8 ignore start */
138+
// Not reachable, always throws
139+
/* c8 ignore stop */
140+
} catch (e) {
141+
caughtError = e;
142+
}
143+
expect(caughtError).to.equal('ouch');
144+
});
145+
});

src/execution/__tests__/subscribe-test.ts

Lines changed: 154 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,17 +82,22 @@ const emailSchema = new GraphQLSchema({
8282
}),
8383
});
8484

85-
function createSubscription(pubsub: SimplePubSub<Email>) {
85+
function createSubscription(
86+
pubsub: SimplePubSub<Email>,
87+
variableValues?: { readonly [variable: string]: unknown },
88+
) {
8689
const document = parse(`
87-
subscription ($priority: Int = 0) {
90+
subscription ($priority: Int = 0, $shouldDefer: Boolean = false) {
8891
importantEmail(priority: $priority) {
8992
email {
9093
from
9194
subject
9295
}
93-
inbox {
94-
unread
95-
total
96+
... @defer(if: $shouldDefer) {
97+
inbox {
98+
unread
99+
total
100+
}
96101
}
97102
}
98103
}
@@ -122,7 +127,12 @@ function createSubscription(pubsub: SimplePubSub<Email>) {
122127
}),
123128
};
124129

125-
return subscribe({ schema: emailSchema, document, rootValue: data });
130+
return subscribe({
131+
schema: emailSchema,
132+
document,
133+
rootValue: data,
134+
variableValues,
135+
});
126136
}
127137

128138
// TODO: consider adding this method to testUtils (with tests)
@@ -679,6 +689,144 @@ describe('Subscription Publish Phase', () => {
679689
});
680690
});
681691

692+
it('produces additional payloads for subscriptions with @defer', async () => {
693+
const pubsub = new SimplePubSub<Email>();
694+
const subscription = await createSubscription(pubsub, {
695+
shouldDefer: true,
696+
});
697+
assert(isAsyncIterable(subscription));
698+
// Wait for the next subscription payload.
699+
const payload = subscription.next();
700+
701+
// A new email arrives!
702+
expect(
703+
pubsub.emit({
704+
from: 'yuzhi@graphql.org',
705+
subject: 'Alright',
706+
message: 'Tests are good',
707+
unread: true,
708+
}),
709+
).to.equal(true);
710+
711+
// The previously waited on payload now has a value.
712+
expect(await payload).to.deep.equal({
713+
done: false,
714+
value: {
715+
data: {
716+
importantEmail: {
717+
email: {
718+
from: 'yuzhi@graphql.org',
719+
subject: 'Alright',
720+
},
721+
},
722+
},
723+
hasNext: true,
724+
},
725+
});
726+
727+
// Wait for the next payload from @defer
728+
expect(await subscription.next()).to.deep.equal({
729+
done: false,
730+
value: {
731+
incremental: [
732+
{
733+
data: {
734+
inbox: {
735+
unread: 1,
736+
total: 2,
737+
},
738+
},
739+
path: ['importantEmail'],
740+
},
741+
],
742+
hasNext: false,
743+
},
744+
});
745+
746+
// Another new email arrives, after all incrementally delivered payloads are received.
747+
expect(
748+
pubsub.emit({
749+
from: 'hyo@graphql.org',
750+
subject: 'Tools',
751+
message: 'I <3 making things',
752+
unread: true,
753+
}),
754+
).to.equal(true);
755+
756+
// The next waited on payload will have a value.
757+
expect(await subscription.next()).to.deep.equal({
758+
done: false,
759+
value: {
760+
data: {
761+
importantEmail: {
762+
email: {
763+
from: 'hyo@graphql.org',
764+
subject: 'Tools',
765+
},
766+
},
767+
},
768+
hasNext: true,
769+
},
770+
});
771+
772+
// Another new email arrives, before the incrementally delivered payloads from the last email was received.
773+
expect(
774+
pubsub.emit({
775+
from: 'adam@graphql.org',
776+
subject: 'Important',
777+
message: 'Read me please',
778+
unread: true,
779+
}),
780+
).to.equal(true);
781+
782+
// Deferred payload from previous event is received.
783+
expect(await subscription.next()).to.deep.equal({
784+
done: false,
785+
value: {
786+
incremental: [
787+
{
788+
data: {
789+
inbox: {
790+
unread: 2,
791+
total: 3,
792+
},
793+
},
794+
path: ['importantEmail'],
795+
},
796+
],
797+
hasNext: false,
798+
},
799+
});
800+
801+
// Next payload from last event
802+
expect(await subscription.next()).to.deep.equal({
803+
done: false,
804+
value: {
805+
data: {
806+
importantEmail: {
807+
email: {
808+
from: 'adam@graphql.org',
809+
subject: 'Important',
810+
},
811+
},
812+
},
813+
hasNext: true,
814+
},
815+
});
816+
817+
// The client disconnects before the deferred payload is consumed.
818+
expect(await subscription.return()).to.deep.equal({
819+
done: true,
820+
value: undefined,
821+
});
822+
823+
// Awaiting a subscription after closing it results in completed results.
824+
expect(await subscription.next()).to.deep.equal({
825+
done: true,
826+
value: undefined,
827+
});
828+
});
829+
682830
it('produces a payload when there are multiple events', async () => {
683831
const pubsub = new SimplePubSub<Email>();
684832
const subscription = createSubscription(pubsub);

src/execution/execute.ts

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import {
5252
collectFields,
5353
collectSubfields as _collectSubfields,
5454
} from './collectFields';
55+
import { flattenAsyncIterator } from './flattenAsyncIterator';
5556
import { mapAsyncIterator } from './mapAsyncIterator';
5657
import {
5758
getArgumentValues,
@@ -1399,19 +1400,11 @@ function mapSourceToResponse(
13991400
// the GraphQL specification. The `execute` function provides the
14001401
// "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
14011402
// "ExecuteQuery" algorithm, for which `execute` is also used.
1402-
return mapAsyncIterator(resultOrStream, (payload: unknown) => {
1403-
const executionResult = executeImpl(
1404-
buildPerEventExecutionContext(exeContext, payload),
1405-
);
1406-
/* c8 ignore next 6 */
1407-
// TODO: implement support for defer/stream in subscriptions
1408-
if (isAsyncIterable(executionResult)) {
1409-
throw new Error(
1410-
'TODO: implement support for defer/stream in subscriptions',
1411-
);
1412-
}
1413-
return executionResult as PromiseOrValue<ExecutionResult>;
1414-
});
1403+
return flattenAsyncIterator<ExecutionResult, AsyncExecutionResult>(
1404+
mapAsyncIterator(resultOrStream, (payload: unknown) =>
1405+
executeImpl(buildPerEventExecutionContext(exeContext, payload)),
1406+
),
1407+
);
14151408
}
14161409

14171410
/**

0 commit comments

Comments
 (0)