Skip to content

Latest commit

 

History

History
135 lines (98 loc) · 5.53 KB

File metadata and controls

135 lines (98 loc) · 5.53 KB

slonq

slonq (from the Slavic slon, meaning elephant, + q for queue) is a reliable, lightweight PostgreSQL job queue.

This is the Python package for slonq, providing high-performance bindings to the Rust core.

Built on top of FOR UPDATE SKIP LOCKED, it provides atomic, concurrent task distribution without the need for an external message broker. It is designed for small to medium-scale projects that already use PostgreSQL and require an ‘at-least-once’ delivery guarantee without adding infrastructure complexity.

Features

  • Atomic concurrency: Utilises Postgres SKIP LOCKED to ensure multiple workers can dequeue tasks simultaneously without collisions.
  • Idempotency support: Built-in support for idempotency keys to prevent duplicate job insertion.
  • Delayed jobs: Schedule tasks to become visible at a specific time in the future.
  • Lease mechanism: Jobs are ‘leased’ to workers; if a worker crashes, the lease expires and the job becomes visible for retry.
  • High performance: Core logic implemented in Rust with Python bindings via PyO3.

Installation

pip install slonq

Quick start

The following example demonstrates the full lifecycle of a job, including enqueuing, dequeuing, heartbeat (touching), and final acknowledgement using asyncio.

import asyncio
from slonq import PgQueue

async def main():
    # 1. Initialise the connection to Postgres
    queue = await PgQueue.connect("postgres://postgres@localhost:5432/db")

    # 2. Enqueue a job with an idempotency key and a 5-minute (300s) lease
    await queue.enqueue(
        "unique-request-id-123", 
        {"type": "process_video", "path": "/uploads/vid.mp4"}, 
        300
    )

    # 3. Dequeue a batch of jobs for 'worker-01'
    # This atomically leases up to 5 jobs for 3 attempts each
    jobs = await queue.dequeue("worker-01", 5, 3)

    for job in jobs:
        lease = job.lease_key()
        if not lease:
            continue

        # 4. Extend the lease (Heartbeat)
        # If the task is taking longer than expected, 'touch' it to prevent others from picking it up
        await queue.touch(lease, 60)

        # 5. Success vs failure logic
        success = True  # Replace with actual processing logic

        if success:
            # 6. Acknowledge (mark as done)
            await queue.ack(lease)
        else:
            # 7. Negatively acknowledge (return to queue with a 10s delay)
            await queue.nack(lease, 3, delay_seconds=10.0)
    
    # 8. Batch Acknowledgement
    # If you have a list of processed jobs, you can ack them all at once
    # await queue.ack_batch(list_of_leases)

if __name__ == "__main__":
    asyncio.run(main())

How it works

slonq manages job states through a visibility-based lease system:

  1. Enqueue: A job is inserted with a visible_at timestamp.
  2. Dequeue: A worker selects a batch of jobs where visible_at <= now(). Using FOR UPDATE SKIP LOCKED, Postgres ensures no two workers grab the same job. The visible_at is then moved forward by the lease_timeout, effectively ‘locking’ the job for that worker.
  3. Heartbeat: If a job is long-running, the worker can call touch() to extend the lease.
  4. Ack/Nack:
    • Ack: Successfully processed jobs are marked as done.
    • Nack: If a worker fails, it can negatively acknowledge the job to make it visible for retry immediately (or with a delay).
  5. Recovery: If a worker crashes, the visible_at time eventually passes, and the job naturally becomes available for another worker to attempt.

Operational considerations

  • Database schema: You must run the provided migration SQL to create the necessary table and indices.
  • Visibility: Because slonq relies on now(), ensure your application servers and database server have synchronised clocks.
  • At-least-once delivery: slonq guarantees that a job will be delivered to at least one worker. Ensure your worker logic is idempotent.

Database schema

slonq requires a specific table structure and a custom ENUM type to manage job states. You can apply the following migration to your PostgreSQL instance:

-- Required ONLY for PostgreSQL versions prior to 13 to support gen_random_uuid()
CREATE EXTENSION IF NOT EXISTS pgcrypto;

-- Define the job lifecycle states
CREATE TYPE job_status AS ENUM ('pending', 'in_progress', 'done', 'failed');

CREATE TABLE jobs
(
   id                    BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
   idempotency_key       TEXT UNIQUE NOT NULL,
   status                job_status  NOT NULL DEFAULT 'pending',
   payload               JSONB       NOT NULL,

   -- 'Next eligible time' logic:
   -- Pending: When the job becomes available for its first attempt.
   -- In_progress: When the current lease is set to expire.
   visible_at            TIMESTAMPTZ NOT NULL DEFAULT now(),

   -- Tracks attempts to manage retry logic and dead-lettering
   attempt_count         INT         NOT NULL DEFAULT 0,

   -- Per-job lease duration (in seconds)
   lease_timeout_seconds INT         NOT NULL DEFAULT 60 CHECK (lease_timeout_seconds > 0),

   -- Unique lease identifier (regenerated on each dequeue)
   lease_id              UUID NULL,
   leased_by             TEXT NULL,

   created_at            TIMESTAMPTZ NOT NULL DEFAULT now(),
   updated_at            TIMESTAMPTZ NOT NULL DEFAULT now()
);

-- Index for the dequeue operation (FOR UPDATE SKIP LOCKED)
CREATE INDEX idx_jobs_dequeue
   ON jobs (visible_at) WHERE status IN ('pending', 'in_progress');

License

This project is licensed under the MIT License.