Background task processing in rust using apalis and pgmq
- Reliable message queue using
pgmqas the backend. - Multiple storage types: standard polling and
triggerbased storages. - Custom codecs for serializing/deserializing job arguments as bytes.
- Integration with
apalisworkers and middleware. - Observability: Monitor and manage tasks using apalis-board.
The fastest way to get started is by running the Docker image, where PGMQ comes pre-installed in Postgres.
docker run -d --name pgmq-postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 ghcr.io/pgmq/pg18-pgmq:v1.7.0use apalis::prelude::*;
use apalis_pgmq::*;
use futures::stream::{self, StreamExt, SinkExt};
#[tokio::main]
async fn main() {
let pool = PgPool::connect(env::var("DATABASE_URL").unwrap().as_str())
.await
.unwrap();
PGMQueue::setup(&pool).await.unwrap();
let mut backend = PGMQueue::new(pool, "default_queue").await;
backend.send(Task::new(HashMap::new())).await.unwrap();
async fn send_reminder(
msg: HashMap<String, String>,
wrk: WorkerContext,
) -> Result<(), BoxDynError> {
Ok(())
}
let worker = WorkerBuilder::new("rango-tango-1")
.backend(backend)
.build(send_reminder);
worker.run().await.unwrap();
}Track your jobs using apalis-board.

- Eager Fetcher
- Lazy Fetcher (using NOTIFY)
- Shared Fetcher (Multiple queues on the same connection)
- Batch Sink
- BackendExt
- Worker heartbeats
- Workflow support
- Extensive Docs
- Maximize compatibility with pgmq
Our version of pgmq differs in several ways to offer better support for apalis:
- Messages are stored as
BYTEAinstead ofJSONBto offer better codec support. - Uses
headerswhich is not yet supported in the rs version - Uses the
apalis_pgmqschema instead ofpgmq.
- pgmq :A lightweight message queue.
Licensed under Postgres License.