Skip to content

Commit

Permalink
Use a queue of promises to keep asyncMap results in order.
Browse files Browse the repository at this point in the history
  • Loading branch information
benjamn committed Mar 10, 2021
1 parent 5adb05e commit e59cbcb
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 5 deletions.
57 changes: 57 additions & 0 deletions src/utilities/observables/__tests__/asyncMap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { itAsync } from "../../testing";
import { Observable } from "../Observable";
import { asyncMap } from "../asyncMap";

const wait = (delayMs: number) =>
new Promise<void>(resolve => setTimeout(resolve, delayMs));

function make1234Observable() {
return new Observable<number>(observer => {
observer.next(1);
observer.next(2);
setTimeout(() => {
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 10);
}, 10);
});
}

function rejectExceptions<Args extends any[], Ret>(
reject: (reason: any) => any,
fn: (...args: Args) => Ret,
) {
return function () {
try {
return fn.apply(this, arguments);
} catch (error) {
reject(error);
}
} as typeof fn;
}

describe("asyncMap", () => {
itAsync("keeps normal results in order", (resolve, reject) => {
const values: number[] = [];
const mapped: number[] = [];

asyncMap(make1234Observable(), value => {
values.push(value);
// Make earlier results take longer than later results.
const delay = 100 - value * 10;
return wait(delay).then(() => value * 2);
}).subscribe({
next(mappedValue) {
mapped.push(mappedValue);
},
error: reject,
complete: rejectExceptions(reject, () => {
expect(values).toEqual([1, 2, 3, 4]);
expect(mapped).toEqual([2, 4, 6, 8]);
resolve();
}),
});
});
});
19 changes: 14 additions & 5 deletions src/utilities/observables/asyncMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ export function asyncMap<V, R>(
const { next, error, complete } = observer;
let activeCallbackCount = 0;
let completed = false;
let promiseQueue = {
// Normally we would initialize promiseQueue to Promise.resolve(), but
// in this case, for backwards compatibility, we need to be careful to
// invoke the first callback synchronously.
then(callback: () => any) {
return new Promise(resolve => resolve(callback()));
},
} as Promise<void>;

function makeCallback(
examiner: typeof mapFn | typeof catchFn,
Expand All @@ -19,20 +27,21 @@ export function asyncMap<V, R>(
if (examiner) {
return arg => {
++activeCallbackCount;
new Promise(resolve => resolve(examiner(arg))).then(
const both = () => examiner(arg);
promiseQueue = promiseQueue.then(both, both).then(
result => {
--activeCallbackCount;
next && next.call(observer, result);
if (completed) {
handler.complete!();
}
},
e => {
error => {
--activeCallbackCount;
throw e;
throw error;
},
).catch(e => {
error && error.call(observer, e);
).catch(caught => {
error && error.call(observer, caught);
});
};
} else {
Expand Down

0 comments on commit e59cbcb

Please sign in to comment.