Skip to content

Commit f7e151d

Browse files
committed
transmit source event stream errors as an error payload
1. akin to a request error, without any data field 2. and close the response event stream ensuring no further events are sent
1 parent 079167d commit f7e151d

File tree

5 files changed

+74
-24
lines changed

5 files changed

+74
-24
lines changed

src/execution/__tests__/cancellation-test.ts

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import { assert, expect } from 'chai';
22
import { describe, it } from 'mocha';
33

44
import { expectJSON } from '../../__testUtils__/expectJSON.js';
5-
import { expectPromise } from '../../__testUtils__/expectPromise.js';
65
import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js';
76

87
import { isAsyncIterable } from '../../jsutils/isAsyncIterable.js';
@@ -902,9 +901,15 @@ describe('Execute: Cancellation', () => {
902901

903902
abortController.abort();
904903

905-
await expectPromise(subscription.next()).toRejectWith(
906-
'This operation was aborted',
907-
);
904+
expectJSON(await subscription.next()).toDeepEqual({
905+
value: { errors: [{ message: 'This operation was aborted' }] },
906+
done: false,
907+
});
908+
909+
expectJSON(await subscription.next()).toDeepEqual({
910+
value: undefined,
911+
done: true,
912+
});
908913
});
909914

910915
it('should stop the execution when aborted during subscription returned asynchronously', async () => {
@@ -941,8 +946,14 @@ describe('Execute: Cancellation', () => {
941946

942947
abortController.abort();
943948

944-
await expectPromise(subscription.next()).toRejectWith(
945-
'This operation was aborted',
946-
);
949+
expectJSON(await subscription.next()).toDeepEqual({
950+
value: { errors: [{ message: 'This operation was aborted' }] },
951+
done: false,
952+
});
953+
954+
expectJSON(await subscription.next()).toDeepEqual({
955+
value: undefined,
956+
done: true,
957+
});
947958
});
948959
});

src/execution/__tests__/mapAsyncIterable-test.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,26 @@ describe('mapAsyncIterable', () => {
8989
});
9090
});
9191

92+
it('calls onError with iterator errors', async () => {
93+
async function* source() {
94+
yield 1;
95+
throw new Error('Oops');
96+
}
97+
98+
const doubles = mapAsyncIterable(
99+
source(),
100+
(x) => Promise.resolve(x + x),
101+
() => Promise.resolve(0),
102+
);
103+
104+
expect(await doubles.next()).to.deep.equal({ value: 2, done: false });
105+
expect(await doubles.next()).to.deep.equal({ value: 0, done: false });
106+
expect(await doubles.next()).to.deep.equal({
107+
value: undefined,
108+
done: true,
109+
});
110+
});
111+
92112
it('calls done when completes', async () => {
93113
async function* source() {
94114
yield 1;
@@ -100,6 +120,7 @@ describe('mapAsyncIterable', () => {
100120
const doubles = mapAsyncIterable(
101121
source(),
102122
(x) => Promise.resolve(x + x),
123+
undefined,
103124
() => {
104125
done = true;
105126
},
@@ -126,6 +147,7 @@ describe('mapAsyncIterable', () => {
126147
const doubles = mapAsyncIterable(
127148
source(),
128149
(x) => Promise.resolve(x + x),
150+
undefined,
129151
() => {
130152
done = true;
131153
},

src/execution/__tests__/subscribe-test.ts

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,7 +1013,7 @@ describe('Subscription Publish Phase', () => {
10131013
});
10141014
});
10151015

1016-
it('should not trigger when subscription is thrown', async () => {
1016+
it('should terminate when subscription is thrown', async () => {
10171017
const pubsub = new SimplePubSub<Email>();
10181018
const subscription = createSubscription(pubsub);
10191019
assert(isAsyncIterable(subscription));
@@ -1050,15 +1050,14 @@ describe('Subscription Publish Phase', () => {
10501050

10511051
payload = subscription.next();
10521052

1053-
// Throw error
1054-
let caughtError;
1055-
try {
1056-
/* c8 ignore next 2 */
1057-
await subscription.throw('ouch');
1058-
} catch (e) {
1059-
caughtError = e;
1060-
}
1061-
expect(caughtError).to.equal('ouch');
1053+
const thrown = subscription.throw('ouch');
1054+
1055+
expectJSON(await thrown).toDeepEqual({
1056+
done: false,
1057+
value: {
1058+
errors: [{ message: 'Unexpected error value: "ouch"' }],
1059+
},
1060+
});
10621061

10631062
expect(await payload).to.deep.equal({
10641063
done: true,
@@ -1230,7 +1229,16 @@ describe('Subscription Publish Phase', () => {
12301229
},
12311230
});
12321231

1233-
await expectPromise(subscription.next()).toRejectWith('test error');
1232+
expectJSON(await subscription.next()).toDeepEqual({
1233+
done: false,
1234+
value: {
1235+
errors: [
1236+
{
1237+
message: 'test error',
1238+
},
1239+
],
1240+
},
1241+
});
12341242

12351243
expect(await subscription.next()).to.deep.equal({
12361244
done: true,

src/execution/execute.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2137,6 +2137,7 @@ function mapSourceToResponse(
21372137
};
21382138
return validatedExecutionArgs.perEventExecutor(perEventExecutionArgs);
21392139
},
2140+
(error) => ({ errors: [locatedError(error, undefined)] }),
21402141
() => abortSignalListener?.disconnect(),
21412142
);
21422143
}

src/execution/mapAsyncIterable.ts

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,21 @@
11
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
22

33
/**
4-
* Given an AsyncIterable and a callback function, return an AsyncIterator
5-
* which produces values mapped via calling the callback function.
4+
* Given an AsyncIterable and a onValue function, return an AsyncIterator
5+
* which produces values mapped via calling the onValue function.
66
*/
77
export function mapAsyncIterable<T, U, R = undefined>(
88
iterable: AsyncGenerator<T, R, void> | AsyncIterable<T>,
9-
callback: (value: T) => PromiseOrValue<U>,
9+
onValue: (value: T) => PromiseOrValue<U>,
10+
onError: (error: any) => PromiseOrValue<U> = (error: any) => {
11+
throw error;
12+
},
1013
onDone?: (() => void) | undefined,
1114
): AsyncGenerator<U, R, void> {
1215
const iterator = iterable[Symbol.asyncIterator]();
1316

17+
let errored = false;
18+
1419
async function mapResult(
1520
promise: Promise<IteratorResult<T, R>>,
1621
): Promise<IteratorResult<U, R>> {
@@ -23,12 +28,13 @@ export function mapAsyncIterable<T, U, R = undefined>(
2328
}
2429
value = result.value;
2530
} catch (error) {
31+
errored = true;
2632
onDone?.();
27-
throw error;
33+
return { value: await onError(error), done: false };
2834
}
2935

3036
try {
31-
return { value: await callback(value), done: false };
37+
return { value: await onValue(value), done: false };
3238
} catch (error) {
3339
/* c8 ignore start */
3440
// FIXME: add test case
@@ -46,7 +52,9 @@ export function mapAsyncIterable<T, U, R = undefined>(
4652

4753
return {
4854
async next() {
49-
return mapResult(iterator.next());
55+
return errored
56+
? Promise.resolve({ value: undefined as any, done: true })
57+
: mapResult(iterator.next());
5058
},
5159
async return(): Promise<IteratorResult<U, R>> {
5260
// If iterator.return() does not exist, then type R must be undefined.

0 commit comments

Comments
 (0)