A robust async PostgreSQL-backed background job processing system. It features:
- Prioritized job execution with configurable priorities
- Job deduplication to prevent duplicate work
- Multiple job queues with independent worker pools
- Automatic retry with exponential backoff for failed jobs
- Per-job timeouts to prevent long-running jobs from blocking the queue
- Graceful shutdown and queue management
- Error tracking with Sentry integration
use workers::BackgroundJob;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct SendEmailJob {
to: String,
subject: String,
body: String,
}
impl BackgroundJob for SendEmailJob {
const JOB_TYPE: &'static str = "send_email";
const PRIORITY: i16 = 10;
const DEDUPLICATED: bool = false;
type Context = AppContext;
async fn run(&self, ctx: Self::Context) -> anyhow::Result<()> {
// Job implementation
ctx.email_service.send(&self.to, &self.subject, &self.body).await?;
Ok(())
}
}use workers::{Runner, Queue};
use std::time::Duration;
let runner = Runner::new(connection_pool, app_context)
.add_queue(
Queue::named("emails")
.register::<SendEmailJob>()
.num_workers(2)
.poll_interval(Duration::from_secs(5))
.jitter(Duration::from_millis(500))
);
let handle = runner.start();
handle.wait_for_shutdown().await;let job = SendEmailJob {
to: "user@example.com".to_string(),
subject: "Welcome!".to_string(),
body: "Thanks for signing up!".to_string(),
};
job.enqueue(&pool).await?;Prevent long-running jobs from blocking the queue by configuring timeouts at the queue level. Jobs that exceed their queue's timeout are cancelled and marked as failed, then retried according to the retry policy.
use std::time::Duration;
let runner = Runner::new(pool, context)
.add_queue(
Queue::named("github")
.register::<FetchRepoJob>()
// All jobs in this queue timeout after 30s
.timeout(Duration::from_secs(30))
);Note
When a job times out:
- Jobs are not forcefully interrupted. They stop at the next
.await - Timed-out jobs will be retried from the beginning, not resumed (!)
- Jobs must be designed to be idempotent or handle partial execution safely
- Avoid holding Tokio mutexes across
.awaitpoints where cancellation could leave shared state inconsistent
See the timeout example for details on cancel safety considerations.
JOB_TYPE: Unique identifier for the job typePRIORITY: Execution priority (higher values = higher priority)DEDUPLICATED: Whether to prevent duplicate jobs with identical data
A queue is a named "worker pool" with its own dedicated set of workers. Queues are about resource allocation and isolation, while job types define the specific task logic and data.
Multiple different job types can share the same queue, and that's often desirable. Queues solve a critical problem: preventing slow or resource-constrained jobs from blocking unrelated work.
For example, if you have several job types that make API calls to a rate-limited external service (like GitHub's API
with 5000 requests/hour), you might route all those job types to a dedicated github queue with one worker to stay under the limit,
while keeping your high-volume email jobs in a separate queue with ten workers for maximum throughput.
Each queue can be tuned independently based on the resource constraints and throughput needs of the job types it handles.
- Worker count: Number of concurrent workers per queue
- Poll interval: How often workers check for new jobs
- Jitter: Random time added to poll intervals to reduce thundering herd effects (default: 100ms)
- Timeout: Optional duration after which jobs will be cancelled and marked as failed (applies to all jobs in the queue)
- Shutdown behavior: Whether to stop when queue is empty
- Archive completed jobs: Whether to archive successful jobs instead of deleting them
For improved throughput in high-volume scenarios, you can enqueue multiple jobs
of the same type in a single operation. This is more efficient than individual
enqueue() calls as it uses a single database transaction and PostgreSQL bulk
insert operations. Under the hood, this uses PostgreSQL arrays with UNNEST for
bulk insert, handles deduplication with conditional inserts, and uses a single
transaction for atomicity.
The system consists of three main components:
BackgroundJobtrait - Define job types and their execution logicRunner- High-level orchestrator that manages multiple queues and their worker poolsWorker- Low-level executor that polls for and processes individual jobs
-
Runneris the entry point and orchestrator:- Manages multiple named queues (e.g., "default", "emails", "indexing")
- Spawns and coordinates multiple
Workerinstances per queue - Handles job type registration and queue configuration
- Provides graceful shutdown coordination across all workers
-
Workeris the actual job processor:- Polls the database for available jobs in a specific queue
- Locks individual jobs to prevent concurrent execution
- Executes job logic with error handling and retry logic
- Reports job completion or failure back to the database
Jobs are stored in the background_jobs PostgreSQL table and processed asynchronously by worker instances that poll for available work in their assigned queues.
When a worker picks up a job from the database, the table row is immediately locked to prevent other workers from processing the same job concurrently. This ensures that:
- Each job is processed exactly once, even with multiple workers running
- Failed jobs can be safely retried without duplication
- The system scales horizontally by adding more worker processes
Once job execution completes successfully, the row is deleted from the table. If the job fails, the row remains with updated retry information for future processing attempts.
use workers::BackgroundJob;
// Create multiple jobs of the same type
let email_jobs = vec![
SendEmailJob { to: "user1@example.com".to_string(), subject: "Welcome!".to_string(), body: "...".to_string() },
SendEmailJob { to: "user2@example.com".to_string(), subject: "Welcome!".to_string(), body: "...".to_string() },
SendEmailJob { to: "user3@example.com".to_string(), subject: "Welcome!".to_string(), body: "...".to_string() },
];
// Enqueue all jobs in a single transaction
let job_ids = SendEmailJob::enqueue_batch(&email_jobs, &pool).await?;
// job_ids contains Option<i64> for each job:
// - Some(id) for successfully enqueued jobs
// - None for jobs that were deduplicated (if DEDUPLICATED = true)You can see this in action with:
cargo run --example batchBy default, successfully completed jobs are deleted from the database. However, you can configure jobs to be archived instead, which moves them to an archive table for debugging, auditing, and potential replay.
use workers::{Runner, Queue, ArchivalPolicy};
let runner = Runner::new(pool, context)
.add_queue(
Queue::named("important")
.register::<MyJob>()
.archive(ArchivalPolicy::Always) // Enable archiving
);
// We can get much more fine-grained control by using a predicate function:
let runner = Runner::new(pool, context)
.add_queue(
Queue::named("fails_sometimes")
.register::<MyJob>()
.archive(ArchivalPolicy::If(|job, _ctx| {
// Archive only jobs that had to retry
job.retries > 0
}))
);To query archived jobs:
use workers::{get_archived_jobs, ArchiveQuery, archived_job_count};
# use sqlx::PgPool;
# async fn example() -> Result<(), Box<dyn std::error::Error>> {
# let pool = PgPool::connect("").await?;
// Get all archived jobs
let archived = get_archived_jobs(&pool, ArchiveQuery::All).await?;
// Get archived jobs for a specific job type
let archived = get_archived_jobs(
&pool,
ArchiveQuery::Filter {
job_filter: Some("important".to_string()),
limit: None,
}
).await?;
// Get archived jobs with limit
let archived = get_archived_jobs(
&pool,
ArchiveQuery::Filter {
job_filter: Some("important".to_string()),
limit: Some(50),
}
).await?;
// Get count of archived jobs
let count = archived_job_count(&pool).await?;
# Ok(())
# }In case you have chosen to archive successfully completed jobs, you might run into the issue that your database keeps growing in size indefinitely.
For this eventuality, you can configure an ArchiveCleaner to take care of cleanups for you.
Each configured job type will start a background task with its own timer that periodically runs cleanup operations based on the provided configuration.
use workers::{ArchiveCleanerBuilder, CleanupConfiguration, CleanupPolicy};
use chrono::TimeDelta;
use std::time::Duration;
ArchiveCleanerBuilder::new()
.configure::<MyJob>(CleanupConfiguration {
cleanup_every: Duration::from_secs(60), // Run cleanup every 60 seconds
policy: CleanupPolicy::MaxCount(1000), // Keep only the latest 1000 archived jobs
})
.run(&pool);
// Alternative policies:
// - MaxAge: Remove jobs older than a specific duration
ArchiveCleanerBuilder::new()
.configure::<MyJob>(CleanupConfiguration {
cleanup_every: Duration::from_secs(3600),
policy: CleanupPolicy::MaxAge(TimeDelta::days(30)), // Remove jobs older than 30 days
})
.run(&pool);
// - Retain: Keep at least N jobs, but remove older ones beyond a certain age
ArchiveCleanerBuilder::new()
.configure::<MyJob>(CleanupConfiguration {
cleanup_every: Duration::from_secs(3600),
policy: CleanupPolicy::Retain {
max_age: TimeDelta::days(7),
keep_at_least: 100, // Always keep at least 100 jobs, even if older than 7 days
},
})
.run(&pool);Note
These migrations only CREATE new tables - your existing data is completely safe.
This library requires some PostgreSQL tables to work. There are a few ways to setup up the DB.
The easiest way is to use the built-in setup function:
use sqlx::PgPool;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
// One-line database setup - safe to call multiple times
// Under the hood, this runs `sqlx::migrate`
workers::setup_database(&pool).await?;
// Now you can use the workers library...
Ok(())
}This embeds the migrations at compile time and is completely self-contained.
Execute the SQL statements in the migrations folder on your existing PostgreSQL database.
Failed jobs are automatically retried with exponential backoff. The retry count and last retry timestamp are tracked in the database. Jobs that continue to fail will eventually be abandoned after reaching the maximum retry limit.
All job execution is instrumented with tracing and optionally reported to Sentry for error monitoring.
The workers poll the database for new jobs at regular intervals (configurable via poll_interval). While this is simple and reliable, it does generate constant database queries. For production deployments, consider:
Tuning Poll Intervals:
.add_queue(
Queue::named("low_priority")
.register::<LowPriorityJob>()
.poll_interval(Duration::from_secs(30)) // Less frequent polling
)
.add_queue(
Queue::named("urgent")
.register::<UrgentJob>()
.poll_interval(Duration::from_millis(100)) // More frequent polling
)PostgreSQL's LISTEN/NOTIFY feature seems perfect for real-time job notifications, but we chose polling with jitter instead. Here's why:
- LISTEN/NOTIFY creates thundering herds. When you send one NOTIFY, all workers wake up at once and compete for the same single job. This creates database contention spikes that hurt performance.
- You still need polling anyway. LISTEN/NOTIFY isn't reliable enough on its own - notifications can be lost during connection drops or server restarts. You end up implementing both systems, which adds complexity.
- Connection poolers interfere. Production deployments often use
PgBounceror similar tools. LISTEN requires persistent connections, which conflicts with connection pool strategies.
The polling approach prevents thundering herds through added jitter, works with any PostgreSQL setup, is simple and reliable (no missed notifications) and easy to tune for your workload (e.g. few or many jobs). Feel free to open an issue if we're missing anything.
This project uses TestContainers for integration testing, which automatically spins up PostgreSQL containers during test execution.
You can run a stress test like so:
make stressOr run with custom parameters
cd examples/stress_test
cargo run --release -- --jobs 500 --workers 12 --duration 60The stress test is based on a popular monster collecting game. ;) It demonstrates a few features:
- Multiple job types: Catch Pokemon, train them, heal at Pokemon Centers, battle gym leaders, and explore new areas
- Different queue priorities: Healing and gym battles get higher priority than exploration
- Job deduplication: Training jobs are deduplicated to prevent over-training the same Pokemon
- Different processing times: Each job type has different duration ranges to simulate real workloads
Simply run the tests - TestContainers handles the database setup automatically:
# Run all tests (PostgreSQL containers managed automatically)
make test
# Run tests with verbose output
make test-verbose
# Run all quality checks (format, lint, test)
make ci- Docker:
TestContainersrequires Docker to be running for integration tests⚠️ Important: Make sure Docker Desktop (or equivalent) is started before running tests- Tests will fail with connection errors if Docker is not available
- Rust: Standard Rust toolchain for compilation
The tests will automatically:
- Start a
PostgreSQLcontainer usingTestContainers - Run database migrations
- Execute the test suite
- Clean up containers when finished
No manual database setup required!
If tests fail with "client error (Connect)" or similar Docker errors:
- Ensure Docker Desktop is running
- Verify with:
docker ps - If Docker is running but tests still fail, try:
docker system pruneto clean up resources
The implementation was originally developed as part of crates.io and was extracted from the separate
swirl project, then re-integrated and heavily modified
to provide a robust, production-ready job queue system.