Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion async/deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"./unstable-semaphore": "./unstable_semaphore.ts",
"./unstable-circuit-breaker": "./unstable_circuit_breaker.ts",
"./unstable-all-keyed": "./unstable_all_keyed.ts",
"./unstable-poll": "./unstable_poll.ts"
"./unstable-poll": "./unstable_poll.ts",
"./unstable-pool-settled": "./unstable_pool_settled.ts"
}
}
189 changes: 189 additions & 0 deletions async/unstable_pool_settled.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright 2018-2026 the Deno authors. MIT license.
// This module is browser compatible.

/** Options for {@linkcode pooledMapSettled}. */
export interface PooledMapSettledOptions {
/**
* The maximum count of items being processed concurrently. Must be a
* positive integer.
*/
poolLimit: number;
/**
* An AbortSignal to cancel the pooled mapping operation.
*
* If the signal is aborted, no new items will begin processing. All currently
* executing items are allowed to finish and their settled results are yielded.
* The iterator then rejects with the signal's reason.
*
* @default {undefined}
*/
signal?: AbortSignal;
}

/**
* Like {@linkcode pooledMap}, but does not fail fast. Every item is processed
* regardless of earlier failures. Results are yielded as
* {@linkcode PromiseSettledResult} objects in input order.
*
* The relationship to `pooledMap` mirrors `Promise.allSettled` vs `Promise.all`.
*
* If the input iterable itself throws, all currently executing items are
* allowed to finish and their settled results are yielded, then the iterator
* closes. The error from the input iterable is not propagated to the consumer.
*
* @experimental **UNSTABLE**: New API, yet to be vetted.
*
* @example Usage
* ```ts
* import { pooledMapSettled } from "@std/async/unstable-pool-settled";
* import { assertEquals } from "@std/assert";
*
* const results = pooledMapSettled(
* [1, 2, 3],
* (i) => {
* if (i === 2) throw new Error("bad");
* return Promise.resolve(i);
* },
* { poolLimit: 2 },
* );
*
* const settled = await Array.fromAsync(results);
* assertEquals(settled.length, 3);
* assertEquals(settled[0], { status: "fulfilled", value: 1 });
* assertEquals(settled[1]!.status, "rejected");
* assertEquals(settled[2], { status: "fulfilled", value: 3 });
* ```
*
* @example With AbortSignal
* ```ts no-assert ignore
* import { pooledMapSettled } from "@std/async/unstable-pool-settled";
*
* const results = pooledMapSettled([1, 2, 3], async (i) => {
* await new Promise((r) => setTimeout(r, 1000));
* return i;
* }, { poolLimit: 2, signal: AbortSignal.timeout(5_000) });
*
* for await (const result of results) {
* console.log(result);
* }
* ```
*
* @typeParam T the input type.
* @typeParam R the output type.
* @param array The input iterable.
* @param iteratorFn The transform function (sync or async).
* @param options Configuration for concurrency and cancellation.
* @returns An async iterator yielding `PromiseSettledResult<R>` for each item,
* in the order items were yielded from the input.
* @throws {RangeError} If `poolLimit` is not a positive integer.
*/
export function pooledMapSettled<T, R>(
array: Iterable<T> | AsyncIterable<T>,
iteratorFn: (data: T) => R | Promise<R>,
options: PooledMapSettledOptions,
): AsyncIterableIterator<PromiseSettledResult<R>> {
const { poolLimit, signal } = options;

if (!Number.isInteger(poolLimit) || poolLimit < 1) {
throw new RangeError(
`Cannot pool as 'poolLimit' must be a positive integer: received ${poolLimit}`,
);
}

type Settled = PromiseSettledResult<R>;

const ABORT_SENTINEL = Symbol();

const res = new TransformStream<
Promise<Settled | typeof ABORT_SENTINEL>,
Settled
>({
async transform(
p: Promise<Settled | typeof ABORT_SENTINEL>,
controller: TransformStreamDefaultController<Settled>,
) {
const result = await p;
if (result === ABORT_SENTINEL) {
controller.error(signal?.reason);
return;
}
controller.enqueue(result);
},
});

(async () => {
const writer = res.writable.getWriter();
const executing: Array<Promise<unknown>> = [];

function raceWithSignal(
promises: Array<Promise<unknown>>,
): Promise<unknown> {
if (!signal) return Promise.race(promises);
const { promise, resolve, reject } = Promise.withResolvers<never>();
const onAbort = () => reject(signal.reason);
signal.addEventListener("abort", onAbort, { once: true });
return Promise.race([...promises, promise]).finally(() => {
signal.removeEventListener("abort", onAbort);
resolve(undefined as never);
});
}

function settle(fn: () => R | Promise<R>): Promise<Settled> {
return Promise.resolve()
.then(fn)
.then(
(value): PromiseFulfilledResult<R> => ({
status: "fulfilled",
value,
}),
(reason): PromiseRejectedResult => ({ status: "rejected", reason }),
);
}

try {
signal?.throwIfAborted();

for await (const item of array) {
signal?.throwIfAborted();

const p = settle(() => iteratorFn(item));
writer.write(p);
const e: Promise<unknown> = p.then(() =>
executing.splice(executing.indexOf(e), 1)
);
executing.push(e);
if (executing.length >= poolLimit) {
await raceWithSignal(executing);
}
}
await Promise.all(executing);
writer.close();
} catch {
// Wait for in-flight work so their settled results are still yielded in
// order, then write a sentinel that causes the stream to error with the
// abort reason.
await Promise.all(executing).catch(() => {});
if (signal?.aborted) {
writer.write(Promise.resolve(ABORT_SENTINEL)).catch(() => {});
} else {
writer.close();
}
}
})();

// Feature test until browser coverage is adequate
return Symbol.asyncIterator in res.readable &&
typeof res.readable[Symbol.asyncIterator] === "function"
? (res.readable[Symbol.asyncIterator] as () => AsyncIterableIterator<
Settled
>)()
: (async function* () {
const reader = res.readable.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
yield value;
}
reader.releaseLock();
})();
}
186 changes: 186 additions & 0 deletions async/unstable_pool_settled_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Copyright 2018-2026 the Deno authors. MIT license.
import { pooledMapSettled } from "./unstable_pool_settled.ts";
import {
assertEquals,
assertGreaterOrEqual,
assertLess,
assertRejects,
assertThrows,
} from "@std/assert";
import { delay } from "./delay.ts";

Deno.test("pooledMapSettled() throws for invalid poolLimit", () => {
const noop = (i: number) => i;
for (const poolLimit of [0, -1, 1.5, NaN, Infinity]) {
assertThrows(
() => pooledMapSettled([1], noop, { poolLimit }),
RangeError,
"'poolLimit' must be a positive integer",
);
}
});

Deno.test("pooledMapSettled() yields rejected results without stopping", async () => {
const results = await Array.fromAsync(
pooledMapSettled(
[1, 2, 3, 4],
(i) => {
if (i % 2 === 0) throw new Error(`fail ${i}`);
return Promise.resolve(i);
},
{ poolLimit: 2 },
),
);

assertEquals(results.length, 4);
assertEquals(results[0], { status: "fulfilled", value: 1 });
assertEquals(results[1]!.status, "rejected");
assertEquals(results[2], { status: "fulfilled", value: 3 });
assertEquals(results[3]!.status, "rejected");
});

Deno.test("pooledMapSettled() preserves input order", async () => {
const results = await Array.fromAsync(
pooledMapSettled(
[1, 2, 3, 4, 5],
async (i) => {
await delay(50 / i);
return i;
},
{ poolLimit: 3 },
),
);

assertEquals(
results.map((r) => {
if (r.status === "fulfilled") return r.value;
throw new Error("unexpected rejection");
}),
[1, 2, 3, 4, 5],
);
});

Deno.test("pooledMapSettled() supports sync iteratorFn", async () => {
const results = await Array.fromAsync(
pooledMapSettled(
["a", "b", "c"],
(s) => s.toUpperCase(),
{ poolLimit: 2 },
),
);

assertEquals(results, [
{ status: "fulfilled", value: "A" },
{ status: "fulfilled", value: "B" },
{ status: "fulfilled", value: "C" },
]);
});

Deno.test("pooledMapSettled() respects pool limit", async () => {
let concurrent = 0;
let maxConcurrent = 0;

await Array.fromAsync(
pooledMapSettled(
[1, 2, 3, 4, 5, 6],
async (i) => {
concurrent++;
maxConcurrent = Math.max(maxConcurrent, concurrent);
await delay(50);
concurrent--;
return i;
},
{ poolLimit: 2 },
),
);

assertGreaterOrEqual(maxConcurrent, 1);
assertLess(maxConcurrent, 3);
});

Deno.test("pooledMapSettled() rejects with already-aborted signal", async () => {
const controller = new AbortController();
controller.abort(new Error("already aborted"));

const results = pooledMapSettled(
[1, 2, 3],
(i) => Promise.resolve(i),
{ poolLimit: 2, signal: controller.signal },
);

await assertRejects(
() => Array.fromAsync(results),
Error,
"already aborted",
);
});

Deno.test("pooledMapSettled() yields in-flight results then rejects on abort", async () => {
const controller = new AbortController();
const collected: PromiseSettledResult<number>[] = [];

const results = pooledMapSettled(
[1, 2, 3, 4, 5, 6, 7, 8],
async (i) => {
await delay(50);
if (i === 2) controller.abort(new Error("stop"));
return i;
},
{ poolLimit: 1, signal: controller.signal },
);

await assertRejects(
async () => {
for await (const result of results) {
collected.push(result);
}
},
Error,
"stop",
);

assertGreaterOrEqual(collected.length, 1);
assertLess(collected.length, 8);
for (const r of collected) {
assertEquals(r.status, "fulfilled");
}
});

Deno.test("pooledMapSettled() closes cleanly when input iterable throws", async () => {
async function* failing() {
yield 1;
yield 2;
throw new Error("source failed");
}

const results = await Array.fromAsync(
pooledMapSettled(
failing(),
(i) => Promise.resolve(i * 10),
{ poolLimit: 2 },
),
);

assertEquals(results, [
{ status: "fulfilled", value: 10 },
{ status: "fulfilled", value: 20 },
]);
});

Deno.test("pooledMapSettled() checks browser compat", async () => {
const asyncIterFunc = ReadableStream.prototype[Symbol.asyncIterator];
// deno-lint-ignore no-explicit-any
delete (ReadableStream.prototype as any)[Symbol.asyncIterator];
try {
const results = await Array.fromAsync(
pooledMapSettled(
[1, 2, 3],
(i) => new Promise<number>((r) => setTimeout(() => r(i), 50)),
{ poolLimit: 2 },
),
);
assertEquals(results.length, 3);
} finally {
ReadableStream.prototype[Symbol.asyncIterator] = asyncIterFunc;
}
});
Loading