Why did the task queue go to therapy? It had too many unresolved promises! ๐
A distributed task queue system that's seriously powerful (but doesn't take itself too seriously ๐ญ).
- Task Grouping ๐ฏ - Because some tasks are more social than others
- Distributed Locking ๐ - No queue jumping allowed!
- Retry with Backoff ๐ - If at first you don't succeed... we got you covered
- Redis-Backed ๐ฆ - Because memory is fleeting, but Redis is forever
- TypeScript Support ๐ช - For when
any
just won't cut it
- ๐ Distributed processing with auto load balancing
- ๐ญ Group task management (for tasks that play well with others)
- ๐ Real-time monitoring (because we're all a bit nosy)
- โญ Priority-based processing (some tasks are just more important)
- โก Event-driven architecture (Redis pub/sub magic)
- ๐ก๏ธ Built-in error handling (because stuff happens)
- ๐ Performance metrics (for the data nerds)
- ๐ Round Robin: Fair play for all tasks
- ๐ FIFO: First in, first out (no cutting in line!)
- โญ Priority: VIP tasks get VIP treatment
- ๐ฏ Dynamic: Adapts faster than a developer during a production incident
-
๐ฏ Smart Batching
- Groups tasks like a pro party planner
- Optimizes performance like a caffeine-powered compiler
- Handles bursts better than your morning coffee machine
-
๐ Real-time Analytics
- Success/failure tracking (keeping score)
- Processing time stats (for the speed demons)
- Resource usage metrics (watching the diet)
- Performance insights (big brain time)
- ๐ Redis ACL support (because sharing isn't always caring)
- ๐ฏ Task-level permissions (not everyone gets a backstage pass)
- ๐ Audit logging (tracking who did what)
- ๐ Role-based access (VIP list management)
(Where all the magic happens โจ)
graph TB
Client[๐ฅ๏ธ Client] --> QM[๐ Queue Manager]
QM --> Redis[(๐พ Redis)]
QM --> Worker[๐ท Worker Pool]
QM --> Groups[๐ฅ Task Groups]
Worker --> Redis
Groups --> Redis
subgraph "๐ญ Task Party"
Groups --> Strategy{๐ฏ Strategy}
Strategy --> RR[๐ Round Robin]
Strategy --> FIFO[๐ FIFO]
Strategy --> Priority[โญ Priority]
end
subgraph "๐ช Worker Squad"
Worker --> W1[๐ Worker 1]
Worker --> W2[๐โโ๏ธ Worker 2]
Worker --> W3[๐โโ๏ธ Worker 3]
end
(AKA: The Epic Journey of a Task)
sequenceDiagram
participant C as ๐ฅ๏ธ Client
participant QM as ๐ Queue
participant G as ๐ฅ Group
participant W as ๐ท Worker
participant R as ๐พ Redis
C->>QM: Submit Task ๐ฌ
QM->>G: Group Check ๐
G->>R: Store State ๐พ
QM->>R: Queue Task โก๏ธ
W->>R: Poll Tasks ๐ฃ
W->>G: Check Order ๐
W->>QM: Process โ๏ธ
QM->>C: Done! ๐
(Because who doesn't love practical examples?)
graph TB
Upload[๐ค Upload] --> Process[โ๏ธ Process]
Process --> Encode[๐ฌ Encode]
Encode --> Deliver[๐ Deliver]
style Upload fill:#f9f,stroke:#333
style Process fill:#bbf,stroke:#333
style Encode fill:#bfb,stroke:#333
style Deliver fill:#fbf,stroke:#333
npm install @cleo/core
# or if you're yarn-core'd
yarn add @cleo/core
(Because the best way to learn is by doing!)
import { Cleo } from '@cleo/core';
// Get your Cleo instance (it's like a task-managing pet)
const cleo = Cleo.getInstance();
// Configure it (give it treats and training)
cleo.configure({
redis: {
host: "localhost",
port: 6379,
password: "cleosecret",
},
worker: {
concurrency: 4,
queues: [
{
name: "send-email",
priority: TaskPriority.HIGH,
},
],
},
});
// Monitor your tasks (helicopter parenting, but for code)
const queueManager = cleo.getQueueManager();
queueManager.onTaskEvent(ObserverEvent.STATUS_CHANGE, (taskId, status, data) => {
console.log(`Task ${taskId} status changed to ${status}`, data);
});
import { task } from "@cleo/core";
class EmailService {
@task({
id: "send-email",
priority: TaskPriority.HIGH,
queue: 'send-email',
})
async sendEmail(input: { email: string }): Promise<string> {
// Your email sending logic here
return `Sent to ${input.email}`;
}
}
import { QueueClass, GroupProcessingStrategy } from "@cleo/core";
// Define a service with group settings
@QueueClass({
defaultOptions: {
maxRetries: 3,
retryDelay: 1000,
backoff: {
type: "fixed",
delay: 2000,
},
group: "notifications",
timeout: 300000,
},
queue: "notifications",
})
class NotificationService {
async sendPushNotification(data: { message: string }) {
console.log(`๐ฑ Sending push: ${data.message}`);
return `Notification sent: ${data.message}`;
}
async sendSMS(data: { message: string }) {
console.log(`๐ฒ Sending SMS: ${data.message}`);
return `SMS sent: ${data.message}`;
}
}
// Use different processing strategies
const queueManager = cleo.getQueueManager();
// Round Robin (taking turns like a proper queue)
queueManager.setGroupProcessingStrategy(GroupProcessingStrategy.ROUND_ROBIN);
// FIFO (first in, first out, just like a coffee shop)
queueManager.setGroupProcessingStrategy(GroupProcessingStrategy.FIFO);
// Priority (VIP treatment for important tasks)
queueManager.setGroupProcessingStrategy(GroupProcessingStrategy.PRIORITY);
await queueManager.setGroupPriority("notifications", 10);
// Built-in retry configuration
@QueueClass({
defaultOptions: {
maxRetries: 3,
backoff: {
type: "fixed",
delay: 2000,
},
retryDelay: 1000,
}
})
class ReliableService {
async mightFail() {
// Will retry 3 times with backoff
throw new Error("Oops!");
}
}
// Manual retry with backoff
import { RetryWithBackoff } from "@cleo/core";
const result = await retryWithBackoff(
async () => {
return await unreliableOperation();
},
3, // max retries
1000 // base delay in ms
);
const queueManager = cleo.getQueueManager();
// Monitor all the things!
queueManager.onTaskEvent(ObserverEvent.STATUS_CHANGE, (taskId, status, data) => {
console.log(`๐ฌ Task ${taskId} status: ${status}`);
});
queueManager.onTaskEvent(ObserverEvent.GROUP_CHANGE, (taskId, status, data) => {
console.log(`๐ฅ Group operation: ${data.operation}`);
});
queueManager.onTaskEvent(ObserverEvent.TASK_COMPLETED, (taskId, status, result) => {
console.log(`โ
Task ${taskId} completed:`, result);
});
queueManager.onTaskEvent(ObserverEvent.TASK_FAILED, (taskId, status, error) => {
console.log(`โ Task ${taskId} failed:`, error);
});
Check out our example files for full implementations:
- Basic Usage - Simple task processing with monitoring
- Advanced Features - Group processing, strategies, and error handling
Each example comes with:
- ๐ฏ Complete setup and configuration
- ๐ Event monitoring setup
- ๐ญ Different processing strategies
- ๐ ๏ธ Error handling patterns
- ๐ Performance monitoring
We welcome contributions! Whether you're fixing bugs ๐, adding features โจ, or improving docs ๐, we'd love your help!
Q: How many developers does it take to review a PR? A: None, they're all stuck in an infinite loop of bikeshedding! ๐
Check out our Contributing Guidelines for:
- Code style and standards ๐
- Development workflow ๐
- Project structure ๐๏ธ
- Pull request process ๐
- Bug reporting guidelines ๐
Our project is like a well-oiled machine (that occasionally needs coffee):
- QueueManager ๐ - The traffic controller of your tasks
- TaskGroup ๐ฅ - Because tasks work better in teams
- Worker ๐ - The real MVP doing all the heavy lifting
- Utilities ๐ ๏ธ - Our Swiss Army knife of helper functions
(Because speed matters!)
graph LR
A[๐ Smart Batching] --> B[โก Fast Processing]
B --> C[๐ฏ Optimal Results]
C --> D[๐ Happy Users]
style A fill:#f96,stroke:#333
style B fill:#9cf,stroke:#333
style C fill:#9f9,stroke:#333
style D fill:#f9f,stroke:#333
MIT License - see LICENSE file for details
Remember: In a world of callbacks, promises, and async/await, we're all just trying our best to avoid race conditions! ๐
Made with โค๏ธ and probably too much caffeine โ