Skip to content

πŸ– Configurable async task queue, w/ throttling, retries, error handling - by EliteLibs.

Notifications You must be signed in to change notification settings

elite-libs/promise-pool

Repository files navigation

Promise Pool πŸ–οΈ

CI Status NPM version GitHub stars

A background task processor focused on performance, reliability, and durability.

TLDR; An upgraded Promise queue that's essentially a stateful Promise.all() wrapper.

| Table of Contents

Smart Multi-Threaded Execution

Diagram of Promise Pool's 'Minimal Blocking' design

Diagram: Visual of our minimal blocking design

Key Promise Pool Features

Promise Pool strives to excel at 4 key goals:

  1. Durability - won't fail in unpredictable ways - even under extreme load.
  2. Extensible by Design - supports Promise-based libraries (examples include: retry handling, debounce/throttle)
  3. Reliability - control your pool with a total runtime limit (align to max HTTP/Lambda request limit), idle timeout (catch orphan or zombie situations), plus a concurrent worker limit.
  4. Performance - the work pool queue uses a speedy Ring Buffer design!*

* Since this is JavaScript, our Ring Buffer is more like three JS Arrays in a trenchcoat.

Who needs a Promise Pool?

  • Any Node Services (Lambda functions, etc.) which does background work, defined as:
    • Long-running async function(s),
      • where the return value isn't used (in the current request.)
      • And failures are handled by logging.

Features

  • Configurable.
  • Concurrency limit.
  • Retries. (Use p-retry for this.)
  • Zero dependencies.
  • Error handling.
  • Singleton mode: Option to auto-reset when .done(). (Added .drain() method.)
  • Task scheduling & prioritization.
    • Support or return hints/stats? (Time in Event Loop? Event Loop wait time? Pending/Complete task counts?)
    • Support smart 'Rate Limit' logic.
      • Recommended solution: conditionally delay (using await delay(requestedWaitTime)) before (or after) each HTTP call.
      • Typically you'll detect Rate Limits via HTTP headers (or Payload/body data.) For example, check for any headers like X-RateLimit-WaitTimeMS.)

API

PromisePool exposes 3 methods:

  • .add(...tasks) - add one (or more) tasks for background processing. (A task is a function that wraps a Promise value. e.g. () => Promise.resolve(1)).
  • .drain() - Returns a promise that resolves when all tasks have been processed, or another thread takes over waiting by calling .drain() again.
  • .done() - Drains AND 'finalizes' the pool. No more tasks can be added after this. Can be called from multiple threads, only runs once.

See either the Usage Section below, or checkout the /examples folder for more complete examples.

Install

# with npm
npm install @elite-libs/promise-pool
# or using yarn
yarn add @elite-libs/promise-pool

Usage

Minimal Example

import PromisePool from '@elite-libs/promise-pool';
// 1/3: Either use the default instance or create a new one.
const pool = new PromisePool();

(async () => {
  // 2/3: Add task(s) to the pool as needed.
  // PromisePool will execute them in parallel as soon as possible.
  pool.add(() => saveToS3(data));
  pool.add(() => expensiveBackgroundWork(data));
  
  // 3/3: REQUIRED: in order to ensure your tasks are executed, 
  // you must await either `pool.drain()` or `pool.done()` at some point in your code (`done` prevents additional tasks from being added).
  await pool.drain();
})();

Singleton Pattern

Recommended for virtually all projects. (API, CLI, Lambda, Frontend, etc.)

The singleton pattern creates exactly 1 Promise Pool - as soon as the script is imported (typically on the first run).

This ensures the maxWorkers value will act as a global limit on the number of tasks that can run at the same time.

File /services/taskPoolSingleton.ts

import PromisePool from '@elite-libs/promise-pool';

export const taskPool = new PromisePool({
  maxWorkers: 6, // Optional. Default is `4`.
});

Recipes

See examples below, or check out the /examples folder for more complete examples.

AWS Lambda & Middy

Promise Pool in some middy middleware:

File 1/2 ./services/taskPoolMiddleware.ts

Note: The imported taskPool is a singleton instance defined in the taskPoolSingleton file.

import { taskPool } from './services/taskPoolSingleton';

export const taskPoolMiddleware = () => ({
  before: (request) => {
    Object.assign(request.context, { taskPool });
  },
  after: async (request) => {
    await request.context.taskPool.drain();
  }
});

Now you can use taskPool in your Lambda function via event.context.taskPool:

File 2/2 ./handlers/example.handler.ts

import middy from '@middy/core';
import { taskPoolMiddleware } from './services/taskPoolMiddleware';

const handleEvent = (event) => {
  const { taskPool } = event.context;

  const data = getDataFromEvent(event);

  taskPool.add(() => saveToS3(data));
  taskPool.add(() => expensiveBackgroundWork(data));
  
  return {
    statusCode: 200,
    body: JSON.stringify({
      message: 'Success',
    }),
  }
}

export const handler = middy(handleEvent)
  .use(taskPoolMiddleware());

Express Middleware

File 1/3 /services/taskPoolSingleton.mjs

import PromisePool from '@elite-libs/promise-pool'

export const taskPool = new PromisePool({
  maxWorkers: 6 // Optional. Default is `4`.
})

File 2/3 /middleware/taskPool.middleware.mjs

import { taskPool } from "../services/taskPoolSingleton.mjs";

const taskPoolMiddleware = {
  setup: (request, response, next) => {
    request.taskPool = taskPool
    next()
  },
  cleanup: (request, response, next) => {
    if (request.taskPool && 'drain' in request.taskPool) {
      taskPool.drain()
    }
    next()
  }
}

export default taskPoolMiddleware

To use the taskPoolMiddleware in your Express app, you'd include taskPoolMiddleware.setup() and taskPoolMiddleware.cleanup().

File 3/3 /app.mjs

import taskPoolMiddleware from "../middleware/taskPool.middleware.mjs"

export const app = express()

// Step 1/2: Setup the taskPool
app.use(taskPoolMiddleware.setup)
app.use(express.bodyParser())

app.post('/users/', function post(request, response, next) {
    const { taskPool } = request

    const data = getDataFromBody(request.body)

    // You can .add() tasks wherever needed,
    //   - they'll run in the background.
    taskPool.add(() => logMetrics(data))
    taskPool.add(() => saveToS3(request))
    taskPool.add(() => expensiveBackgroundWork(data))
    
    // Or, 'schedule' multiple tasks at once.
    taskPool.add(
      () => logMetrics(data), 
      () => saveToS3(request),
      () => expensiveBackgroundWork(data)
    )

    next()
  })

// Step 2/2: Drain the taskPool
app.use(taskPoolMiddleware.cleanup)

Changelog

v1.3.1 - April 2023

  • Upgraded dev dependencies (dependabot).
  • Cleaned up README code & description.

v1.3.0 - September 2022

  • What? Adds Smart .drain() behavior.
  • Why?
    1. Speeds up concurrent server environments!
    2. prevents several types of difficult (ghost) bugs!
      • stack overflows/mem access violations,
      • exceeding rarely hit OS limits, (e.g. max open handle/file/socket limits)
      • exceeding limits of Network hardware (e.g. connections/sec, total open socket limit, etc.)
      • uneven observed wait times.
  • How? Only awaits the latest .drain() caller.
  • Who? Server + Singleton use cases will see a major benefit to this design.
  • Huh? See diagram