Distributed cron jobs for Node.js using PostgreSQL Advisory Locks
Prevent duplicate task execution across multiple application instances using PostgreSQL's native locking mechanism.
In distributed environments, scheduled tasks run on every instance simultaneously:
- Instance 1: Sends daily email at 9:00 AM
- Instance 2: Sends daily email at 9:00 AM β DUPLICATE
- Instance 3: Sends daily email at 9:00 AM β DUPLICATEResult: Users receive 3 identical emails, databases are processed multiple times, resources are wasted.
pg-cron-lock ensures only ONE instance executes each scheduled task:
+ Instance 1: β
Acquires lock β Sends email
+ Instance 2: β Lock taken β Skips gracefully
+ Instance 3: β Lock taken β Skips gracefullyResult: Perfect coordination, zero duplicates, optimal resource usage.
npm install pg-cron-lockconst { PgCronLock } = require('pg-cron-lock');
const { Sequelize } = require('sequelize');
// Initialize with your existing Sequelize connection
const sequelize = new Sequelize('postgresql://user:pass@localhost:5432/db');
const cronLock = new PgCronLock({
database: { sequelize },
});
// Schedule a distributed task
cronLock
.schedule({
name: 'daily-data-processing',
schedule: '0 9 * * *', // Every day at 9 AM
handler: async (metadata) => {
console.log(`Processing on: ${metadata.nodeId}`);
// Your business logic here
await processData();
return { processed: 42 };
},
options: {
lockKey: 100001, // Unique identifier for this task
},
})
.start();
console.log('β
Distributed cron job started!');| Feature | Description | Benefit |
|---|---|---|
| π Distributed Locking | PostgreSQL Advisory Locks | No external dependencies |
| β° Standard Cron Syntax | Full node-cron compatibility |
Easy migration |
| π Smart Retry Logic | Configurable backoff strategies | Handles transient failures |
| π Built-in Monitoring | Execution stats & health checks | Production observability |
| π‘οΈ TypeScript First | Complete type definitions | Better developer experience |
| β‘ Zero Config | Works with existing Sequelize setup | Drop-in replacement |
cronLock.schedule({
name: 'unique-task-name', // Required: Task identifier
schedule: '0 */6 * * *', // Required: Standard cron expression
handler: async (metadata) => {
// Required: Your task logic
// metadata.nodeId - Current instance ID
// metadata.lockKey - Lock identifier
// metadata.taskName - Task name
return result; // Optional return value
},
options: {
lockKey: 200001, // Required: Unique lock identifier
timeout: 60000, // Optional: Max execution time (ms)
maxRetries: 3, // Optional: Retry attempts on failure
retryDelay: 5000, // Optional: Delay between retries (ms)
},
timezone: 'Europe/Paris', // Optional: Timezone for schedule
});// Execute with automatic lock management
const result = await cronLock.withLock(
'manual-task',
300001, // Lock key
async (metadata) => {
await performOperation();
return { success: true };
}
);
if (result) {
console.log('Task executed:', result);
} else {
console.log('Task skipped (lock not available)');
}// Control task lifecycle
cronLock.startAll(); // Start all scheduled tasks
cronLock.stopAll(); // Stop all scheduled tasks
cronLock.startTask('task-name'); // Start specific task
cronLock.stopTask('task-name'); // Stop specific task
await cronLock.triggerTask('task-name'); // Execute immediately
// Task information
const taskNames = cronLock.getTaskNames();
const task = cronLock.getTask('task-name');const stats = cronLock.getStats('task-name');
console.log(stats);Output:
{
"taskName": "daily-processing",
"totalExecutions": 127,
"successfulExecutions": 125,
"failedExecutions": 2,
"skippedExecutions": 1045,
"averageDuration": 2340,
"lastExecution": "2024-01-15T09:00:00.000Z",
"lastSuccess": "2024-01-15T09:00:00.000Z",
"lastFailure": "2024-01-14T09:00:00.000Z",
"lastError": "Connection timeout"
}const health = await cronLock.getHealthStatus();
console.log(health);Output:
{
"databaseConnected": true,
"activeLocks": 3,
"scheduledTasks": 8,
"runningTasks": 1
}const cronLock = new PgCronLock({
database: {
sequelize: sequelizeInstance,
},
// Custom logging
logger: {
info: (msg) => logger.info(msg),
error: (msg) => logger.error(msg),
warn: (msg) => logger.warn(msg),
},
// Global defaults
defaults: {
retryOnFailure: true,
maxRetries: 5,
retryDelay: 2000,
timeout: 300000, // 5 minutes
logSuccess: false, // Reduce log noise
logFailure: true,
},
// Prevent conflicts between applications
lockKeyPrefix: 'myapp',
// Debug mode
debug: process.env.NODE_ENV === 'development',
});cronLock.schedule({
name: 'resilient-task',
schedule: '*/15 * * * *',
handler: async (metadata) => {
try {
await criticalOperation();
} catch (error) {
// Log and re-throw to trigger retry
console.error(`Task failed: ${error.message}`);
throw error;
}
},
options: {
lockKey: 400001,
maxRetries: 3,
retryDelay: 10000, // 10 seconds
// Callbacks for monitoring
onSuccess: (result, metadata) => {
metrics.increment('task.success');
},
onFailure: (error, metadata) => {
metrics.increment('task.failure');
alerting.send(`Task failed: ${error.message}`);
},
onSkipped: (metadata) => {
metrics.increment('task.skipped');
},
},
});// Handle shutdown signals
process.on('SIGTERM', async () => {
console.log('Received SIGTERM, shutting down gracefully...');
await cronLock.destroy({
timeout: 30000, // Wait up to 30s
waitForRunning: true, // Wait for running tasks
force: false, // Don't kill running tasks
});
process.exit(0);
});
process.on('SIGINT', async () => {
console.log('Received SIGINT, shutting down gracefully...');
await cronLock.destroy({ timeout: 10000, force: true });
process.exit(0);
});const express = require('express');
const app = express();
// Health check endpoint
app.get('/health', async (req, res) => {
try {
const health = await cronLock.getHealthStatus();
const allStats = cronLock.getAllStats();
res.json({
status: health.databaseConnected ? 'healthy' : 'unhealthy',
cron: {
scheduledTasks: health.scheduledTasks,
runningTasks: health.runningTasks,
activeLocks: health.activeLocks,
},
tasks: allStats,
});
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// Trigger task endpoint
app.post('/tasks/:name/trigger', async (req, res) => {
try {
const result = await cronLock.triggerTask(req.params.name);
res.json({ success: true, result });
} catch (error) {
res.status(500).json({ error: error.message });
}
});- Node.js 14+ (ESM and CommonJS supported)
- PostgreSQL 11+ (uses Advisory Locks)
- Sequelize 6+ (peer dependency)
Contributions are welcome!
- Fork the repository
- Create your feature branch:
git checkout -b feature/my-feature - Commit changes:
git commit -m 'Add my feature' - Push to branch:
git push origin feature/my-feature - Submit a Pull Request
MIT License - see LICENSE for details.
- Built on node-cron
- Uses PostgreSQL Advisory Locks
- Integrates with Sequelize ORM
Ready to eliminate duplicate tasks in your distributed Node.js applications? Start with npm install pg-cron-lock π