pgqrs is a PostgreSQL-backed durable workflow engine and job queue. Written in Rust with Python bindings.
- Library-only: No servers to operate. Use directly in your Rust or Python applications.
- Connection Pooler Compatible: Works with pgBouncer and pgcat for connection scaling.
- Efficient: Uses PostgreSQL's
SKIP LOCKEDfor concurrent job fetching. - Exactly-once Delivery: Guarantees within visibility timeout window.
- Message Archiving: Built-in audit trails and historical data retention.
- Crash Recovery: Resume from the last completed step after failures.
- Exactly-once Steps: Completed steps are never re-executed.
- Persistent State: All workflow progress stored in PostgreSQL.
use pgqrs::workflow::Workflow;
use pgqrs_macros::{pgqrs_workflow, pgqrs_step};
#[pgqrs_step]
async fn fetch_data(ctx: &Workflow, url: &str) -> Result<String, anyhow::Error> {
Ok(reqwest::get(url).await?.text().await?)
}
#[pgqrs_step]
async fn process_data(ctx: &Workflow, data: String) -> Result<i32, anyhow::Error> {
Ok(data.lines().count() as i32)
}
#[pgqrs_workflow]
async fn data_pipeline(ctx: &Workflow, url: &str) -> Result<String, anyhow::Error> {
let data = fetch_data(ctx, url).await?;
let count = process_data(ctx, data).await?;
Ok(format!("Processed {} lines", count))
}
// Usage
let workflow = Workflow::create(pool, "data_pipeline", &url).await?;
let result = data_pipeline(&workflow, url).await?;from pgqrs import Admin, PyWorkflow
from pgqrs.decorators import workflow, step
@step
async def fetch_data(ctx: PyWorkflow, url: str) -> dict:
return await http_client.get(url)
@step
async def process_data(ctx: PyWorkflow, data: dict) -> dict:
return {"processed": True, "count": len(data)}
@workflow
async def data_pipeline(ctx: PyWorkflow, url: str):
data = await fetch_data(ctx, url)
result = await process_data(ctx, data)
return result
# Usage
admin = Admin("postgresql://localhost/mydb", None)
await admin.install()
ctx = await admin.create_workflow("data_pipeline", "https://api.example.com")
result = await data_pipeline(ctx, "https://api.example.com")pip install pgqrs[dependencies]
pgqrs = "0.4.0"Prerequisites:
- Rust: 1.70+
- Python: 3.8+
- PostgreSQL: 12+
We use make to manage the development lifecycle.
# Setup environment and install dependencies
make requirements# Build both Rust core and Python bindings
make build
# Run all tests (Rust + Python)
make test