Skip to content

replace raw mapAsyncIterable with Repeater based approach #3694

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/execution/__tests__/mapAsyncIterable-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ describe('mapAsyncIterable', () => {
yield 3; // Shouldn't be reached.
} finally {
didVisitFinally = true;
yield 1000;
}
yield 1000; /* c8 ignore start */
} /* c8 ignore stop */
}

const throwOver1 = mapAsyncIterable(source(), mapper);
Expand Down
2 changes: 1 addition & 1 deletion src/execution/__tests__/simplePubSub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export class SimplePubSub<T> {
return this._subscribers.size > 0;
}

getSubscriber<R>(transform: (value: T) => R): AsyncGenerator<R, void, void> {
getSubscriber<R>(transform: (value: T) => R) {
const pullQueue: Array<(result: IteratorResult<R, void>) => void> = [];
const pushQueue: Array<R> = [];
let listening = true;
Expand Down
29 changes: 18 additions & 11 deletions src/execution/__tests__/subscribe-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -739,9 +739,8 @@ describe('Subscription Publish Phase', () => {
},
});

const returned = await subscription.return();
payload = subscription.next();
await subscription.return();

// A new email arrives!
expect(
pubsub.emit({
Expand All @@ -752,6 +751,10 @@ describe('Subscription Publish Phase', () => {
}),
).to.equal(false);

expect(returned).to.deep.equal({
done: true,
value: undefined,
});
expect(await payload).to.deep.equal({
done: true,
value: undefined,
Expand Down Expand Up @@ -793,17 +796,21 @@ describe('Subscription Publish Phase', () => {
},
});

const error = new Error('should not trigger when subscription is thrown');
const caughtError = subscription.throw(error).catch((e) => e);
payload = subscription.next();

// Throw error
let caughtError;
try {
/* c8 ignore next 2 */
await subscription.throw('ouch');
} catch (e) {
caughtError = e;
}
expect(caughtError).to.equal('ouch');
// A new email arrives!
expect(
pubsub.emit({
from: 'yuzhi@graphql.org',
subject: 'Alright 2',
message: 'Tests are good 2',
unread: true,
}),
).to.equal(true);

expect(await caughtError).to.equal(error);

expect(await payload).to.deep.equal({
done: true,
Expand Down
107 changes: 65 additions & 42 deletions src/execution/mapAsyncIterable.ts
Original file line number Diff line number Diff line change
@@ -1,57 +1,80 @@
/* eslint-disable no-await-in-loop */

import type { PromiseOrValue } from '../jsutils/PromiseOrValue';
import { Repeater, RepeaterClosureSignal } from '../jsutils/Repeater';

/**
* Given an AsyncIterable and a callback function, return an AsyncIterator
* Given an AsyncIterable and a callback function, return an AsyncGenerator
* which produces values mapped via calling the callback function.
*/
export function mapAsyncIterable<T, U, R = undefined>(
iterable: AsyncGenerator<T, R, void> | AsyncIterable<T>,
callback: (value: T) => PromiseOrValue<U>,
fn: (value: T) => PromiseOrValue<U>,
): AsyncGenerator<U, R, void> {
const iterator = iterable[Symbol.asyncIterator]();
return new Repeater<U, R, void>(async ({ push, stop }) => {
const iterator: AsyncIterator<T, R, void> =
iterable[Symbol.asyncIterator]();

async function mapResult(
result: IteratorResult<T, R>,
): Promise<IteratorResult<U, R>> {
if (result.done) {
return result;
}
let next = iterator.next();
// eslint-disable-next-line no-constant-condition
while (true) {
let iteration: IteratorResult<T, R>;
try {
iteration = await next;
} catch (err) {
await abruptClose(iterator);
throw err;
}

try {
return { value: await callback(result.value), done: false };
} catch (error) {
/* c8 ignore start */
// FIXME: add test case
if (typeof iterator.return === 'function') {
try {
await iterator.return();
} catch (_e) {
/* ignore error */
const { done, value } = iteration;

if (done) {
stop();
return value;
}

let mapped: U;
try {
mapped = await fn(value);
} catch (err) {
await abruptClose(iterator);
throw err;
}

try {
await push(mapped);
} catch (err) {
if (err instanceof RepeaterClosureSignal) {
if (typeof iterator.return !== 'function') {
stop();
return undefined as unknown as R; // void :(
}

next = iterator.return(err.returnValue);
continue;
}

if (typeof iterator.throw !== 'function') {
await abruptClose(iterator);
throw err;
}

next = iterator.throw(err);
continue;
}
throw error;
/* c8 ignore stop */

next = iterator.next();
}
}
});
}

return {
async next() {
return mapResult(await iterator.next());
},
async return(): Promise<IteratorResult<U, R>> {
// If iterator.return() does not exist, then type R must be undefined.
return typeof iterator.return === 'function'
? mapResult(await iterator.return())
: { value: undefined as any, done: true };
},
async throw(error?: unknown) {
if (typeof iterator.throw === 'function') {
return mapResult(await iterator.throw(error));
}
throw error;
},
[Symbol.asyncIterator]() {
return this;
},
};
async function abruptClose(iterator: AsyncIterator<unknown>): Promise<void> {
if (typeof iterator.return === 'function') {
try {
await iterator.return(); /* c8 ignore start */
} catch (_err) {
// FIXME: add test case
/* ignore error */
} /* c8 ignore stop */
}
}
Loading