Skip to content
/ queue Public

A robust, extensible, plugin-based task queue with retry logic, concurrency control, and event handling.

Notifications You must be signed in to change notification settings

lenqwang/queue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Task Queue

A robust, extensible, plugin-based task queue with retry logic, concurrency control, and event handling.

Features

  • Concurrency Control: Limit the number of tasks running simultaneously
  • Retry Logic: Configurable retry strategies (linear, exponential backoff)
  • Plugin System: Extend functionality with plugins
  • Event-Driven: Listen to task lifecycle events
  • Type-Safe: Full TypeScript support
  • State Management: Query queue state at any time

Basic Usage

import { Queue } from "./queue";

const queue = new Queue<number>({
  concurrency: 3,
  maxRetry: 3,
  retryWaitTimeInSeconds: 2,
});

queue.on("success", (result, context) => {
  console.log(`Task ${context.id} completed:`, result);
});

queue.on("all-done", (errors) => {
  console.log("All tasks completed. Errors:", errors.length);
});

// Add tasks
queue.addTask(async () => {
  return 42;
});

Plugins

Logger Plugin

import { LoggerPlugin } from "./plugins/logger-plugin";

queue.use(new LoggerPlugin({ 
  logLevel: "info",
  prefix: "[MyQueue]"
}));

Metrics Plugin

import { MetricsPlugin } from "./plugins/metrics-plugin";

const metricsPlugin = new MetricsPlugin();
queue.use(metricsPlugin);

// Later...
const metrics = metricsPlugin.getMetrics();
console.log(metrics);
// {
//   totalTasks: 100,
//   successfulTasks: 95,
//   failedTasks: 5,
//   retriedTasks: 10,
//   totalRetries: 15,
//   averageExecutionTime: 234
// }

Rate Limiter Plugin

import { RateLimiterPlugin } from "./plugins/rate-limiter-plugin";

queue.use(new RateLimiterPlugin({
  maxTasksPerWindow: 10,
  windowInSeconds: 60
}));

Retry Strategies

Linear Retry

import { LinearRetryStrategy } from "./retry-strategy";

const queue = new Queue({
  concurrency: 2,
  maxRetry: 3,
  retryWaitTimeInSeconds: 5,
  retryStrategy: new LinearRetryStrategy(5) // Wait 5s between retries
});

Exponential Backoff

import { ExponentialBackoffStrategy } from "./retry-strategy";

const queue = new Queue({
  concurrency: 2,
  maxRetry: 5,
  retryWaitTimeInSeconds: 1,
  retryStrategy: new ExponentialBackoffStrategy(1, 30) // 1s, 2s, 4s, 8s, 16s, max 30s
});

Events

  • success: Task completed successfully
  • error: Task failed after all retries
  • retry: Task is being retried
  • all-done: All tasks completed
  • task-start: Task execution started
  • task-end: Task execution ended
  • queue-drain: Queue is empty and all tasks completed

Custom Plugins

import type { QueuePlugin, TaskContext, QueueState } from "./queue-types";

class CustomPlugin<T> implements QueuePlugin<T> {
  name = "custom";

  async onTaskStart(context: TaskContext<T>, state: QueueState<T>) {
    // Your logic here
  }

  async onTaskSuccess(result: T, context: TaskContext<T>, state: QueueState<T>) {
    // Your logic here
  }
}

queue.use(new CustomPlugin());

API

Queue Methods

  • addTask(task): Add a single task, returns task ID
  • addTasks(tasks): Add multiple tasks, returns array of task IDs
  • stop(): Stop the queue and clear pending tasks
  • getState(): Get current queue state
  • use(plugin): Add a plugin
  • removePlugin(name): Remove a plugin by name
  • getPlugin(name): Get a plugin by name
  • on(event, listener): Add event listener
  • off(event, listener): Remove event listener

Options

interface ExtendedQueueOptions {
  concurrency: number;              // Max concurrent tasks
  maxRetry: number;                 // Max retry attempts
  retryWaitTimeInSeconds: number;   // Base wait time for retries
  retryStrategy?: RetryStrategy;    // Custom retry strategy
  stopOnError?: boolean;            // Stop queue on error (default: true)
}

About

A robust, extensible, plugin-based task queue with retry logic, concurrency control, and event handling.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published