slonq (from the Slavic slon, meaning elephant, + q for queue) is a reliable, lightweight PostgreSQL job queue.
This is the Python package for slonq, providing high-performance bindings to the Rust core.
Built on top of FOR UPDATE SKIP LOCKED, it provides atomic, concurrent task distribution without the need for an external message broker. It is designed for small to medium-scale projects that already use PostgreSQL and require an ‘at-least-once’ delivery guarantee without adding infrastructure complexity.
- Atomic concurrency: Utilises Postgres
SKIP LOCKEDto ensure multiple workers can dequeue tasks simultaneously without collisions. - Idempotency support: Built-in support for idempotency keys to prevent duplicate job insertion.
- Delayed jobs: Schedule tasks to become visible at a specific time in the future.
- Lease mechanism: Jobs are ‘leased’ to workers; if a worker crashes, the lease expires and the job becomes visible for retry.
- High performance: Core logic implemented in Rust with Python bindings via PyO3.
pip install slonqThe following example demonstrates the full lifecycle of a job, including enqueuing, dequeuing, heartbeat (touching), and final acknowledgement using asyncio.
import asyncio
from slonq import PgQueue
async def main():
# 1. Initialise the connection to Postgres
queue = await PgQueue.connect("postgres://postgres@localhost:5432/db")
# 2. Enqueue a job with an idempotency key and a 5-minute (300s) lease
await queue.enqueue(
"unique-request-id-123",
{"type": "process_video", "path": "/uploads/vid.mp4"},
300
)
# 3. Dequeue a batch of jobs for 'worker-01'
# This atomically leases up to 5 jobs for 3 attempts each
jobs = await queue.dequeue("worker-01", 5, 3)
for job in jobs:
lease = job.lease_key()
if not lease:
continue
# 4. Extend the lease (Heartbeat)
# If the task is taking longer than expected, 'touch' it to prevent others from picking it up
await queue.touch(lease, 60)
# 5. Success vs failure logic
success = True # Replace with actual processing logic
if success:
# 6. Acknowledge (mark as done)
await queue.ack(lease)
else:
# 7. Negatively acknowledge (return to queue with a 10s delay)
await queue.nack(lease, 3, delay_seconds=10.0)
# 8. Batch Acknowledgement
# If you have a list of processed jobs, you can ack them all at once
# await queue.ack_batch(list_of_leases)
if __name__ == "__main__":
asyncio.run(main())slonq manages job states through a visibility-based lease system:
- Enqueue: A job is inserted with a
visible_attimestamp. - Dequeue: A worker selects a batch of jobs where
visible_at <= now(). UsingFOR UPDATE SKIP LOCKED, Postgres ensures no two workers grab the same job. Thevisible_atis then moved forward by thelease_timeout, effectively ‘locking’ the job for that worker. - Heartbeat: If a job is long-running, the worker can call
touch()to extend the lease. - Ack/Nack:
- Ack: Successfully processed jobs are marked as
done. - Nack: If a worker fails, it can negatively acknowledge the job to make it visible for retry immediately (or with a delay).
- Ack: Successfully processed jobs are marked as
- Recovery: If a worker crashes, the
visible_attime eventually passes, and the job naturally becomes available for another worker to attempt.
- Database schema: You must run the provided migration SQL to create the necessary table and indices.
- Visibility: Because slonq relies on
now(), ensure your application servers and database server have synchronised clocks. - At-least-once delivery: slonq guarantees that a job will be delivered to at least one worker. Ensure your worker logic is idempotent.
slonq requires a specific table structure and a custom ENUM type to manage job states. You can apply the following migration to your PostgreSQL instance:
-- Required ONLY for PostgreSQL versions prior to 13 to support gen_random_uuid()
CREATE EXTENSION IF NOT EXISTS pgcrypto;
-- Define the job lifecycle states
CREATE TYPE job_status AS ENUM ('pending', 'in_progress', 'done', 'failed');
CREATE TABLE jobs
(
id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
idempotency_key TEXT UNIQUE NOT NULL,
status job_status NOT NULL DEFAULT 'pending',
payload JSONB NOT NULL,
-- 'Next eligible time' logic:
-- Pending: When the job becomes available for its first attempt.
-- In_progress: When the current lease is set to expire.
visible_at TIMESTAMPTZ NOT NULL DEFAULT now(),
-- Tracks attempts to manage retry logic and dead-lettering
attempt_count INT NOT NULL DEFAULT 0,
-- Per-job lease duration (in seconds)
lease_timeout_seconds INT NOT NULL DEFAULT 60 CHECK (lease_timeout_seconds > 0),
-- Unique lease identifier (regenerated on each dequeue)
lease_id UUID NULL,
leased_by TEXT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Index for the dequeue operation (FOR UPDATE SKIP LOCKED)
CREATE INDEX idx_jobs_dequeue
ON jobs (visible_at) WHERE status IN ('pending', 'in_progress');This project is licensed under the MIT License.