Ergon (ἔργον, Greek for "work" or "deed") is a durable execution library for Rust, inspired by Gunnar Morling's Persistasaurus. and several of Jack Vanlightly's blogs on [Durable Execution] (https://jack-vanlightly.com/blog/2025/11/24/demystifying-determinism-in-durable-execution).
It provides automatic state persistence, intelligent retry mechanisms, and distributed workflow orchestration.
Suffice to say that Ergon is a curiosity project for me because I wanted to learn the internals of many aspects of Rust e,g macros, async, typestate, autoref-specialization, to mention a few.
Ergon enables you to write reliable, fault-tolerant workflows that survive process crashes, network failures, and infrastructure outages. Your business logic is expressed as simple async Rust code, while Ergon handles:
- State persistence - Automatic checkpointing of execution state
- Automatic retry - Smart retry with exponential backoff
- Step caching - Deterministic replay using cached results
- DAG parallelization - Automatic parallel execution of independent steps
- Flow versioning - Type-safe deployment versioning with compile-time checks
- External signals - Human-in-the-loop and external event coordination
- Durable timers - Long-running timers that survive crashes
- Distributed workers - Scale horizontally with multiple worker processes (Backed by Redis and Portgres)
use ergon::prelude::*;
#[flow]
async fn process_order(self: Arc<Self>) -> Result<Receipt, ExecutionError> {
// Each step is automatically persisted
let payment = self.clone().charge_payment().await?;
let inventory = self.clone().reserve_inventory().await?;
let shipping = self.clone().ship_order().await?;
// If the process crashes here, it resumes from the last checkpoint
Ok(Receipt { payment, inventory, shipping })
}use ergon::Retryable;
#[derive(Debug, Clone, Serialize, Deserialize)]
enum PaymentError {
NetworkTimeout, // Retryable
InsufficientFunds, // Not retryable
}
impl Retryable for PaymentError {
fn is_retryable(&self) -> bool {
matches!(self, PaymentError::NetworkTimeout)
}
}
#[step]
async fn charge_card(self: Arc<Self>) -> Result<Receipt, PaymentError> {
// Network timeouts retry automatically
// Business rejections fail immediately
}#[step]
async fn await_manager_approval(
self: Arc<Self>,
) -> Result<ApprovalDecision, ExecutionError> {
// Flow suspends here until external signal arrives
let decision = await_external_signal::<ApprovalDecision>().await?;
Ok(decision)
}#[step]
async fn wait_for_settlement(self: Arc<Self>) -> Result<(), ExecutionError> {
// Timer survives process crashes
schedule_timer(Duration::from_secs(3600)).await?;
Ok(())
}// Configure scheduler with deployment version
let scheduler = Scheduler::new(storage)
.with_version(env!("CARGO_PKG_VERSION")); // Type-safe at compile time
// All flows get this version automatically
scheduler.schedule(order_flow).await?;
// Alternative: Read from environment
let scheduler = Scheduler::new(storage).from_env(); // DEPLOY_VERSION=v1.2.3
// Or explicitly unversioned
let scheduler = Scheduler::new(storage).unversioned();// Spawn multiple workers that share the queue
let worker1 = Worker::new(storage.clone(), "worker-1")
.with_signals(signal_source.clone())
.spawn();
let worker2 = Worker::new(storage.clone(), "worker-2")
.with_signals(signal_source.clone())
.spawn();Add ergon to your Cargo.toml:
[dependencies]
ergon = { version = "0.1", features = ["sqlite"] }sqlite(default): SQLite-based storage with bundled SQLiteredis: Redis-based storage for distributed deployments
# Use Redis for distributed systems
ergon = { version = "0.1", default-features = false, features = ["redis"] }use ergon::prelude::*;
use std::sync::Arc;
// 1. Define your workflow
#[derive(Clone, Serialize, Deserialize, FlowType)]
struct OrderFlow {
order_id: String,
amount: f64,
}
impl OrderFlow {
// 2. Implement your business logic with #[flow] and #[step]
#[flow]
async fn process(self: Arc<Self>) -> Result<String, ExecutionError> {
let payment = self.clone().charge_payment().await?;
let shipping = self.clone().ship_order().await?;
Ok(format!("Order {} complete", self.order_id))
}
#[step]
async fn charge_payment(self: Arc<Self>) -> Result<String, String> {
// Payment logic here
Ok("payment_confirmed".to_string())
}
#[step]
async fn ship_order(self: Arc<Self>) -> Result<String, String> {
// Shipping logic here
Ok("shipped".to_string())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 3. Setup storage
let storage = Arc::new(SqliteExecutionLog::new("orders.db")?);
// 4. Create and execute the flow
let flow = OrderFlow {
order_id: "ORDER-123".to_string(),
amount: 99.99,
};
let executor = Executor::new(Uuid::new_v4(), flow, storage);
match executor.execute(|f| Box::pin(f.process())).await {
FlowOutcome::Completed(Ok(result)) => {
println!("Success: {}", result);
}
FlowOutcome::Completed(Err(e)) => {
eprintln!("Flow failed: {}", e);
}
FlowOutcome::Suspended(reason) => {
println!("Flow suspended: {:?}", reason);
}
}
Ok(())
}Ergon follows Parnas's information hiding principles:
core: Foundation types and traits (serialization, retry policies)storage: Persistence layer (SQLite, Redis, in-memory)graph: DAG structures for parallel executionexecutor: Execution engine (scheduler, workers, error handling)
use ergon::prelude::*; // Common types and macros
use ergon::core::RetryPolicy; // Retry configuration
use ergon::executor::Scheduler; // Flow scheduling with versioning
use ergon::executor::Worker; // Distributed workers
use ergon::storage::ExecutionLog; // Storage abstraction#[flow]
async fn sequential(self: Arc<Self>) -> Result<String, ExecutionError> {
let a = self.clone().step1().await?;
let b = self.clone().step2(&a).await?;
let c = self.clone().step3(&b).await?;
Ok(c)
}#[flow]
async fn parallel(self: Arc<Self>) -> Result<Summary, ExecutionError> {
dag! {
let h1 = self.register_fetch_user();
let h2 = self.register_fetch_orders();
let summary = self.register_merge(h1, h2)
}
}#[step]
async fn process_items(self: Arc<Self>) -> Result<Vec<String>, String> {
let mut results = vec![];
for item in &self.items {
let child = ItemFlow::new(item);
let result = self.invoke(child).await?;
results.push(result);
}
Ok(results)
}// Create scheduler (configure once at startup)
let scheduler = Scheduler::new(storage.clone())
.with_version(env!("CARGO_PKG_VERSION")); // or .unversioned()
// Start multiple workers
let mut handles = vec![];
for i in 0..4 {
let worker = Worker::new(storage.clone(), format!("worker-{}", i))
.with_signals(signals.clone())
.with_timers()
.spawn();
handles.push(worker);
}
// Schedule flows (version applied automatically)
scheduler.schedule(flow1).await?;
scheduler.schedule(flow2).await?;
// Workers automatically distribute the loadErgon provides fine-grained error control:
// Define custom errors with retry behavior
#[derive(Debug, Clone, Serialize, Deserialize)]
enum BusinessError {
Retryable(String),
Permanent(String),
}
impl Retryable for BusinessError {
fn is_retryable(&self) -> bool {
matches!(self, BusinessError::Retryable(_))
}
}
// Configure retry policies
#[step(retry_policy = RetryPolicy::AGGRESSIVE)]
async fn unstable_api(self: Arc<Self>) -> Result<Data, BusinessError> {
// Retries with exponential backoff
}let storage = Arc::new(SqliteExecutionLog::new("flows.db")?);let storage = Arc::new(RedisExecutionLog::new("redis://localhost:6379").await?);let storage = Arc::new(InMemoryExecutionLog::new());The repository includes comprehensive examples:
- Basic flows: Simple sequential and parallel workflows
- DAG execution: Complex dependency graphs
- Child flows: Parent-child flow coordination
- Custom errors: Type-safe error handling with retry control
- External signals: Human-in-the-loop approval workflows
- Durable timers: Long-running timer coordination
- Distributed workers: Multi-worker deployments
- Crash recovery: Resilience testing
- Step granularity: Balance between checkpointing overhead and replay cost
- Parallel execution: Use DAG for independent operations
- Storage choice: SQLite for single-process, Redis for distributed
- Retry policies: Configure appropriate backoff for your use case
Inspired by:
- Persistasaurus.
- Temporal - Durable execution platform
- [Jack vanlightly] (https://jack-vanlightly.com/blog/2025/11/24/demystifying-determinism-in-durable-execution).
- Dave Cheney - Practical programming wisdom
- [Autoref Specialization] (https://github.com/dtolnay/case-studies/tree/master/autoref-specialization)