Skip to content

aherve/async-pool

Repository files navigation

Async Pool

A concurrent pool for Node.js and browsers, supporting consuming result via promises, and generators.

NPM Version NPM Downloads NPM License npm bundle size GitHub Actions Workflow Status

Features

  • Concurrency Control: Limit the number of promises running at the same time.
  • Memory Efficiency: Can process large numbers of tasks without having to build a large arrays of results.
  • Flexible API: Supports both async iteration and promise-based consumption.

Installation

npm install @aherve/async-pool
// with TypeScript or ES Modules
import { AsyncPool } from "@aherve/async-pool";

// or with CommonJS
const AsyncPool = require("@aherve/async-pool").AsyncPool;

Usage

Consume results as a stream

const pool = new AsyncPool<boolean>() // generic can optionally be used
  .withConcurrency(10)
  .withRetries(3); // default number of retries for each task unless specified at task level


// Processing will start as soon as the first task is added
for await (const document of largeDBCollectionScanner) {
  pool.add({
    task: () => updateDBDoc(document), // assuming this returns a Promise<boolean>
  })
}

// results can be consumed as a stream, without building a large array of results
let total = 0;
for await (const update of pool.results()) {
  if update.changed { 
    total++
  }
}

console.log("Successfully updated", total, "documents");

Safe mode

Alternatively, safe mode can be used to avoid throwing, and instead return errors in the results stream. When using this mode, the pool will process all taksks, even if some fail, and you can handle errors gracefully.

for await (const res of pool.safeResults()) {
    if (res.success) {
        console.log("Task succeeded:", res.data);
    } else {
        console.error("Task failed:", res.error);
    }
}

Headless processsing

Fire and forget tasks, with controlled concurrency and retries

const pool = new AsyncPool()
  .withConcurrency(10)
  .withRetries(3)
  .add({ task: () => doSomethingAsync() })
  .add({ task: () => doSomethingElse() })

// wait until all tasks are processed
await pool.waitForTermination();

// At this point, all promises have been resolved successfully

Promise-based consumption

Similar to Promise.all, but with controlled concurrency and builtin retries

const pool = new AsyncPool();

pool.add({ task: async () => 1 });
pool.add({ task: async () => true });
pool.add({ task: async () => "hello" });

const results = await pool.all();
console.log(results); // [1, true, "hello"], order not guaranteed (especially if retries happened)

Safe mode

all can also be used in safe mode:

const pool = new AsyncPool();
const error = new Error('nope');

pool.add({ task: async () => 1 });
pool.add({ task: async () => { throw error } });
pool.add({ task: async () => "hello" });
const results = await pool.safeAll();
/**
* [
*   { success: true, data: 1 },
*   { success: false, error },
*   { success: true, data: "hello" }
* ]
*/

Using generic typings

You can specify a generic type for the AsyncPool to enforce type safety on the results of the tasks. If you don't specify a type, it will default to unknown, allowing any type of result.

const typedPool = new AsyncPool<string>();

typedPool.add({ task: async () => "hello" }); // OK
typedPool.add({ task: async () => 1 }); // ❌ Error: Type 'Promise<number>' is not assignable to type 'Promise<string>'.

// typedPool.all() will return a Promise<string[]>, typedPool.results() is an AsyncGenerator<string>

const relaxedPool = new AsyncPool();

relaxedPool.add({ task: async () => "hello" }); // OK
relaxedPool.add({ task: async () => 1 }); // OK

// relaxedPool.all() will return a Promise<unknown[]>, relaxedPool.results() is an AsyncGenerator<unknown>

API Documentation

API docs

About

Process asynchronous tasks with controlled max concurrency and memory efficiency.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published