A background task processor focused on performance, reliability, and durability.
TLDR; An upgraded Promise queue that's essentially a stateful Promise.all()
wrapper.
| Table of Contents
Diagram of Promise Pool's 'Minimal Blocking' design
Promise Pool strives to excel at 4 key goals:
- Durability - won't fail in unpredictable ways - even under extreme load.
- Extensible by Design - supports Promise-based libraries (examples include: retry handling, debounce/throttle)
- Reliability - control your pool with a total runtime limit (align to max HTTP/Lambda request limit), idle timeout (catch orphan or zombie situations), plus a concurrent worker limit.
- Performance - the work pool queue uses a speedy Ring Buffer design!*
* Since this is JavaScript, our Ring Buffer is more like three JS Arrays in a trenchcoat.
- Any Node Services (Lambda functions, etc.) which does background work, defined as:
- Long-running async function(s),
- where the
return
value isn't used (in the current request.) - And failures are handled by logging.
- where the
- Long-running async function(s),
- Configurable.
- Concurrency limit.
- Retries. (Use
p-retry
for this.) - Zero dependencies.
- Error handling.
- Singleton mode: Option to auto-reset when
.done()
. (Added.drain()
method.) - Task scheduling & prioritization.
- Support or
return
hints/stats? (Time in Event Loop? Event Loop wait time? Pending/Complete task counts?) - Support smart 'Rate Limit' logic.
- Recommended solution: conditionally delay (using
await delay(requestedWaitTime)
) before (or after) each HTTP call. - Typically you'll detect Rate Limits via HTTP headers (or Payload/body data.) For example, check for any headers like
X-RateLimit-WaitTimeMS
.)
- Recommended solution: conditionally delay (using
- Support or
PromisePool
exposes 3 methods:
.add(...tasks)
- add one (or more) tasks for background processing. (A task is a function that wraps aPromise
value. e.g.() => Promise.resolve(1)
)..drain()
- Returns a promise that resolves when all tasks have been processed, or another thread takes over waiting by calling.drain()
again..done()
- Drains AND 'finalizes' the pool. No more tasks can be added after this. Can be called from multiple threads, only runs once.
See either the Usage Section below, or checkout the
/examples
folder for more complete examples.
# with npm
npm install @elite-libs/promise-pool
# or using yarn
yarn add @elite-libs/promise-pool
import PromisePool from '@elite-libs/promise-pool';
// 1/3: Either use the default instance or create a new one.
const pool = new PromisePool();
(async () => {
// 2/3: Add task(s) to the pool as needed.
// PromisePool will execute them in parallel as soon as possible.
pool.add(() => saveToS3(data));
pool.add(() => expensiveBackgroundWork(data));
// 3/3: REQUIRED: in order to ensure your tasks are executed,
// you must await either `pool.drain()` or `pool.done()` at some point in your code (`done` prevents additional tasks from being added).
await pool.drain();
})();
Recommended for virtually all projects. (API, CLI, Lambda, Frontend, etc.)
The singleton pattern creates exactly 1 Promise Pool
- as soon as the script is imported (typically on the first run).
This ensures the maxWorkers
value will act as a global limit on the number of tasks that can run at the same time.
import PromisePool from '@elite-libs/promise-pool';
export const taskPool = new PromisePool({
maxWorkers: 6, // Optional. Default is `4`.
});
See examples below, or check out the /examples
folder for more complete examples.
Promise Pool
in some middy
middleware:
Note: The imported taskPool
is a singleton instance defined in the taskPoolSingleton
file.
import { taskPool } from './services/taskPoolSingleton';
export const taskPoolMiddleware = () => ({
before: (request) => {
Object.assign(request.context, { taskPool });
},
after: async (request) => {
await request.context.taskPool.drain();
}
});
Now you can use taskPool
in your Lambda function via event.context.taskPool
:
import middy from '@middy/core';
import { taskPoolMiddleware } from './services/taskPoolMiddleware';
const handleEvent = (event) => {
const { taskPool } = event.context;
const data = getDataFromEvent(event);
taskPool.add(() => saveToS3(data));
taskPool.add(() => expensiveBackgroundWork(data));
return {
statusCode: 200,
body: JSON.stringify({
message: 'Success',
}),
}
}
export const handler = middy(handleEvent)
.use(taskPoolMiddleware());
import PromisePool from '@elite-libs/promise-pool'
export const taskPool = new PromisePool({
maxWorkers: 6 // Optional. Default is `4`.
})
import { taskPool } from "../services/taskPoolSingleton.mjs";
const taskPoolMiddleware = {
setup: (request, response, next) => {
request.taskPool = taskPool
next()
},
cleanup: (request, response, next) => {
if (request.taskPool && 'drain' in request.taskPool) {
taskPool.drain()
}
next()
}
}
export default taskPoolMiddleware
To use the taskPoolMiddleware
in your Express app, you'd include taskPoolMiddleware.setup()
and taskPoolMiddleware.cleanup()
.
import taskPoolMiddleware from "../middleware/taskPool.middleware.mjs"
export const app = express()
// Step 1/2: Setup the taskPool
app.use(taskPoolMiddleware.setup)
app.use(express.bodyParser())
app.post('/users/', function post(request, response, next) {
const { taskPool } = request
const data = getDataFromBody(request.body)
// You can .add() tasks wherever needed,
// - they'll run in the background.
taskPool.add(() => logMetrics(data))
taskPool.add(() => saveToS3(request))
taskPool.add(() => expensiveBackgroundWork(data))
// Or, 'schedule' multiple tasks at once.
taskPool.add(
() => logMetrics(data),
() => saveToS3(request),
() => expensiveBackgroundWork(data)
)
next()
})
// Step 2/2: Drain the taskPool
app.use(taskPoolMiddleware.cleanup)
- Upgraded dev dependencies (dependabot).
- Cleaned up README code & description.
- What? Adds Smart
.drain()
behavior. - Why?
- Speeds up concurrent server environments!
- prevents several types of difficult (ghost) bugs!
- stack overflows/mem access violations,
- exceeding rarely hit OS limits, (e.g. max open handle/file/socket limits)
- exceeding limits of Network hardware (e.g. connections/sec, total open socket limit, etc.)
- uneven observed wait times.
- How? Only awaits the latest
.drain()
caller. - Who?
Server
+Singleton
use cases will see a major benefit to this design. - Huh? See diagram