A robust, extensible, plugin-based task queue with retry logic, concurrency control, and event handling.
- Concurrency Control: Limit the number of tasks running simultaneously
- Retry Logic: Configurable retry strategies (linear, exponential backoff)
- Plugin System: Extend functionality with plugins
- Event-Driven: Listen to task lifecycle events
- Type-Safe: Full TypeScript support
- State Management: Query queue state at any time
import { Queue } from "./queue";
const queue = new Queue<number>({
concurrency: 3,
maxRetry: 3,
retryWaitTimeInSeconds: 2,
});
queue.on("success", (result, context) => {
console.log(`Task ${context.id} completed:`, result);
});
queue.on("all-done", (errors) => {
console.log("All tasks completed. Errors:", errors.length);
});
// Add tasks
queue.addTask(async () => {
return 42;
});import { LoggerPlugin } from "./plugins/logger-plugin";
queue.use(new LoggerPlugin({
logLevel: "info",
prefix: "[MyQueue]"
}));import { MetricsPlugin } from "./plugins/metrics-plugin";
const metricsPlugin = new MetricsPlugin();
queue.use(metricsPlugin);
// Later...
const metrics = metricsPlugin.getMetrics();
console.log(metrics);
// {
// totalTasks: 100,
// successfulTasks: 95,
// failedTasks: 5,
// retriedTasks: 10,
// totalRetries: 15,
// averageExecutionTime: 234
// }import { RateLimiterPlugin } from "./plugins/rate-limiter-plugin";
queue.use(new RateLimiterPlugin({
maxTasksPerWindow: 10,
windowInSeconds: 60
}));import { LinearRetryStrategy } from "./retry-strategy";
const queue = new Queue({
concurrency: 2,
maxRetry: 3,
retryWaitTimeInSeconds: 5,
retryStrategy: new LinearRetryStrategy(5) // Wait 5s between retries
});import { ExponentialBackoffStrategy } from "./retry-strategy";
const queue = new Queue({
concurrency: 2,
maxRetry: 5,
retryWaitTimeInSeconds: 1,
retryStrategy: new ExponentialBackoffStrategy(1, 30) // 1s, 2s, 4s, 8s, 16s, max 30s
});success: Task completed successfullyerror: Task failed after all retriesretry: Task is being retriedall-done: All tasks completedtask-start: Task execution startedtask-end: Task execution endedqueue-drain: Queue is empty and all tasks completed
import type { QueuePlugin, TaskContext, QueueState } from "./queue-types";
class CustomPlugin<T> implements QueuePlugin<T> {
name = "custom";
async onTaskStart(context: TaskContext<T>, state: QueueState<T>) {
// Your logic here
}
async onTaskSuccess(result: T, context: TaskContext<T>, state: QueueState<T>) {
// Your logic here
}
}
queue.use(new CustomPlugin());addTask(task): Add a single task, returns task IDaddTasks(tasks): Add multiple tasks, returns array of task IDsstop(): Stop the queue and clear pending tasksgetState(): Get current queue stateuse(plugin): Add a pluginremovePlugin(name): Remove a plugin by namegetPlugin(name): Get a plugin by nameon(event, listener): Add event listeneroff(event, listener): Remove event listener
interface ExtendedQueueOptions {
concurrency: number; // Max concurrent tasks
maxRetry: number; // Max retry attempts
retryWaitTimeInSeconds: number; // Base wait time for retries
retryStrategy?: RetryStrategy; // Custom retry strategy
stopOnError?: boolean; // Stop queue on error (default: true)
}