Skip to content

Commit

Permalink
Merge pull request #7818 from apollographql/catch-asyncMap-next-excep…
Browse files Browse the repository at this point in the history
…tions

Improvements to asyncMap Observable utility function.
  • Loading branch information
benjamn authored Mar 10, 2021
2 parents 848f0e3 + c860c99 commit f498900
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 4 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## Apollo Client 3.3.12 (not yet released)

### Bug fixes

- Maintain serial ordering of `asyncMap` mapping function calls, and prevent potential unhandled `Promise` rejection errors. <br/>
[@benjamn](https://github.com/benjamn) in [#7818](https://github.com/apollographql/apollo-client/pull/7818)

## Apollo Client 3.3.11

### Bug fixes
Expand Down
136 changes: 136 additions & 0 deletions src/utilities/observables/__tests__/asyncMap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import { itAsync } from "../../testing";
import { Observable } from "../Observable";
import { asyncMap } from "../asyncMap";

const wait = (delayMs: number) =>
new Promise<void>(resolve => setTimeout(resolve, delayMs));

function make1234Observable() {
return new Observable<number>(observer => {
observer.next(1);
observer.next(2);
setTimeout(() => {
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 10);
}, 10);
});
}

function rejectExceptions<Args extends any[], Ret>(
reject: (reason: any) => any,
fn: (...args: Args) => Ret,
) {
return function () {
try {
return fn.apply(this, arguments);
} catch (error) {
reject(error);
}
} as typeof fn;
}

describe("asyncMap", () => {
itAsync("keeps normal results in order", (resolve, reject) => {
const values: number[] = [];
const mapped: number[] = [];

asyncMap(make1234Observable(), value => {
values.push(value);
// Make earlier results take longer than later results.
const delay = 100 - value * 10;
return wait(delay).then(() => value * 2);
}).subscribe({
next(mappedValue) {
mapped.push(mappedValue);
},
error: reject,
complete: rejectExceptions(reject, () => {
expect(values).toEqual([1, 2, 3, 4]);
expect(mapped).toEqual([2, 4, 6, 8]);
resolve();
}),
});
});

itAsync("handles exceptions from mapping functions", (resolve, reject) => {
const triples: number[] = [];
asyncMap(make1234Observable(), num => {
if (num === 3) throw new Error("expected");
return num * 3;
}).subscribe({
next: rejectExceptions(reject, triple => {
expect(triple).toBeLessThan(9);
triples.push(triple);
}),
error: rejectExceptions(reject, error => {
expect(error.message).toBe("expected");
expect(triples).toEqual([3, 6]);
resolve();
}),
});
});

itAsync("handles rejected promises from mapping functions", (resolve, reject) => {
const triples: number[] = [];
asyncMap(make1234Observable(), num => {
if (num === 3) return Promise.reject(new Error("expected"));
return num * 3;
}).subscribe({
next: rejectExceptions(reject, triple => {
expect(triple).toBeLessThan(9);
triples.push(triple);
}),
error: rejectExceptions(reject, error => {
expect(error.message).toBe("expected");
expect(triples).toEqual([3, 6]);
resolve();
}),
});
});

itAsync("handles async exceptions from mapping functions", (resolve, reject) => {
const triples: number[] = [];
asyncMap(make1234Observable(), num => wait(10).then(() => {
if (num === 3) throw new Error("expected");
return num * 3;
})).subscribe({
next: rejectExceptions(reject, triple => {
expect(triple).toBeLessThan(9);
triples.push(triple);
}),
error: rejectExceptions(reject, error => {
expect(error.message).toBe("expected");
expect(triples).toEqual([3, 6]);
resolve();
}),
});
});

itAsync("handles exceptions from next functions", (resolve, reject) => {
const triples: number[] = [];
asyncMap(make1234Observable(), num => {
return num * 3;
}).subscribe({
next(triple) {
triples.push(triple);
// Unfortunately this exception won't be caught by asyncMap, because
// the Observable implementation wraps this next function with its own
// try-catch. Uncomment the remaining lines to make this test more
// meaningful, in the event that this behavior ever changes.
// if (triple === 9) throw new Error("expected");
},
// error: rejectExceptions(reject, error => {
// expect(error.message).toBe("expected");
// expect(triples).toEqual([3, 6, 9]);
// resolve();
// }),
complete: rejectExceptions(reject, () => {
expect(triples).toEqual([3, 6, 9, 12]);
resolve();
}),
});
});
});
19 changes: 15 additions & 4 deletions src/utilities/observables/asyncMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ export function asyncMap<V, R>(
const { next, error, complete } = observer;
let activeCallbackCount = 0;
let completed = false;
let promiseQueue = {
// Normally we would initialize promiseQueue to Promise.resolve(), but
// in this case, for backwards compatibility, we need to be careful to
// invoke the first callback synchronously.
then(callback: () => any) {
return new Promise(resolve => resolve(callback()));
},
} as Promise<void>;

function makeCallback(
examiner: typeof mapFn | typeof catchFn,
Expand All @@ -19,19 +27,22 @@ export function asyncMap<V, R>(
if (examiner) {
return arg => {
++activeCallbackCount;
new Promise(resolve => resolve(examiner(arg))).then(
const both = () => examiner(arg);
promiseQueue = promiseQueue.then(both, both).then(
result => {
--activeCallbackCount;
next && next.call(observer, result);
if (completed) {
handler.complete!();
}
},
e => {
error => {
--activeCallbackCount;
error && error.call(observer, e);
throw error;
},
);
).catch(caught => {
error && error.call(observer, caught);
});
};
} else {
return arg => delegate && delegate.call(observer, arg);
Expand Down

0 comments on commit f498900

Please sign in to comment.