Skip to content

mensfeld/pgmq-ruby

Repository files navigation

PGMQ-Ruby

Gem Version Build Status

Ruby client for PGMQ - PostgreSQL Message Queue

What is PGMQ-Ruby?

PGMQ-Ruby is a Ruby client for PGMQ (PostgreSQL Message Queue). It provides direct access to all PGMQ operations with a clean, minimal API - similar to how rdkafka-ruby relates to Kafka.

Think of it as:

  • Like AWS SQS - but running entirely in PostgreSQL with no external dependencies
  • Like Sidekiq/Resque - but without Redis, using PostgreSQL for both data and queues
  • Like rdkafka-ruby - a thin, efficient wrapper around the underlying system (PGMQ SQL functions)

Architecture Note: This library follows the rdkafka-ruby/Karafka pattern - pgmq-ruby is the low-level foundation, while higher-level features (job processing, Rails integration, retry strategies) will live in pgmq-framework (similar to how Karafka builds on rdkafka-ruby).

Table of Contents

PGMQ Feature Support

This gem provides complete support for all core PGMQ SQL functions. Based on the official PGMQ API:

Category Method Description Status
Sending send Send single message with optional delay âś…
send_batch Send multiple messages atomically âś…
Reading read Read single message with visibility timeout âś…
read_batch Read multiple messages with visibility timeout âś…
read_with_poll Long-polling for efficient message consumption âś…
pop Atomic read + delete operation âś…
Deleting/Archiving delete Delete single message âś…
delete_batch Delete multiple messages âś…
archive Archive single message for long-term storage âś…
archive_batch Archive multiple messages âś…
purge_queue Remove all messages from queue âś…
Queue Management create Create standard queue âś…
create_partitioned Create partitioned queue (requires pg_partman) âś…
create_unlogged Create unlogged queue (faster, no crash recovery) âś…
drop_queue Delete queue and all messages âś…
detach_archive Detach archive table from queue âś…
Utilities set_vt Update message visibility timeout âś…
list_queues List all queues with metadata âś…
metrics Get queue metrics (length, age, total messages) âś…
metrics_all Get metrics for all queues âś…
Ruby Enhancements Transaction Support Atomic operations via client.transaction do |txn| âś…
Conditional Filtering Server-side JSONB filtering with conditional: âś…
Multi-Queue Ops Read/pop/delete/archive from multiple queues âś…
Queue Validation 48-character limit and name validation âś…
Connection Pooling Thread-safe connection pool for concurrency âś…
Pluggable Serializers JSON (default) with custom serializer support âś…

Requirements

  • Ruby 3.2+
  • PostgreSQL 14-18 with PGMQ extension installed

Installation

Add to your Gemfile:

gem 'pgmq-ruby'

Or install directly:

gem install pgmq-ruby

Quick Start

Basic Usage

require 'pgmq'

# Connect to database
client = PGMQ::Client.new(
  host: 'localhost',
  port: 5432,
  dbname: 'mydb',
  user: 'postgres',
  password: 'secret'
)

# Create a queue
client.create('orders')

# Send a message (must be JSON string)
msg_id = client.send('orders', '{"order_id":123,"total":99.99}')

# Read a message (30 second visibility timeout)
msg = client.read('orders', vt: 30)
puts msg.message  # => "{\"order_id\":123,\"total\":99.99}" (raw JSON string)

# Parse and process (you handle deserialization)
data = JSON.parse(msg.message)
process_order(data)
client.delete('orders', msg.msg_id)

# Or archive for long-term storage
client.archive('orders', msg.msg_id)

# Clean up
client.drop_queue('orders')
client.close

Rails Integration (Reusing ActiveRecord Connection)

# config/initializers/pgmq.rb or in your model
class OrderProcessor
  def initialize
    # Reuse Rails' connection pool - no separate connection needed!
    @client = PGMQ::Client.new(-> { ActiveRecord::Base.connection.raw_connection })
  end

  def process_orders
    loop do
      msg = @client.read('orders', vt: 30)
      break unless msg

      # Parse JSON yourself
      data = JSON.parse(msg.message)
      process_order(data)
      @client.delete('orders', msg.msg_id)
    end
  end
end

Connection Options

PGMQ-Ruby supports multiple ways to connect:

Connection Hash

client = PGMQ::Client.new(
  host: 'localhost',
  port: 5432,
  dbname: 'mydb',
  user: 'postgres',
  password: 'secret',
  pool_size: 5,        # Default: 5
  pool_timeout: 5      # Default: 5 seconds
)

Connection String

client = PGMQ::Client.new('postgres://user:pass@localhost:5432/dbname')

Rails ActiveRecord (Recommended for Rails apps)

# Reuses Rails connection pool - no additional connections needed
client = PGMQ::Client.new(-> { ActiveRecord::Base.connection.raw_connection })

Custom Connection Pool

# Bring your own connection management
connection = PGMQ::Connection.new('postgres://localhost/mydb', pool_size: 10)
client = PGMQ::Client.new(connection)

Connection Pool Features

PGMQ-Ruby includes connection pooling with resilience:

# Configure pool size and timeouts
client = PGMQ::Client.new(
  'postgres://localhost/mydb',
  pool_size: 10,           # Number of connections (default: 5)
  pool_timeout: 5,         # Timeout in seconds (default: 5)
  auto_reconnect: true     # Auto-reconnect on connection loss (default: true)
)

# Monitor connection pool health
stats = client.stats
puts "Pool size: #{stats[:size]}"           # => 10
puts "Available: #{stats[:available]}"      # => 8 (2 in use)

# Disable auto-reconnect if you prefer explicit error handling
client = PGMQ::Client.new(
  'postgres://localhost/mydb',
  auto_reconnect: false
)

Connection Pool Benefits:

  • Thread-safe - Multiple threads can safely share a single client
  • Fiber-aware - Works with Ruby 3.0+ Fiber Scheduler for non-blocking I/O
  • Auto-reconnect - Recovers from lost connections (configurable)
  • Health checks - Verifies connections before use to prevent stale connection errors
  • Monitoring - Track pool utilization with client.stats

API Reference

Queue Management

# Create a queue
client.create("queue_name")

# Create partitioned queue (requires pg_partman)
client.create_partitioned("queue_name",
  partition_interval: "daily",
  retention_interval: "7 days"
)

# Create unlogged queue (faster, no crash recovery)
client.create_unlogged("queue_name")

# Drop queue
client.drop_queue("queue_name")

# List all queues
queues = client.list_queues
# => [#<PGMQ::QueueMetadata queue_name="orders" created_at=...>, ...]

Queue Naming Rules

Queue names must follow PostgreSQL identifier rules with PGMQ-specific constraints:

  • Maximum 48 characters (PGMQ enforces this limit for table prefixes)
  • Must start with a letter or underscore
  • Can contain only letters, digits, and underscores
  • Case-sensitive

Valid Queue Names:

client.create("orders")           # âś“ Simple name
client.create("high_priority")    # âś“ With underscore
client.create("Queue123")         # âś“ With numbers
client.create("_internal")        # âś“ Starts with underscore
client.create("a" * 47)          # âś“ Maximum length (47 chars)

Invalid Queue Names:

client.create("123orders")        # âś— Starts with number
client.create("my-queue")         # âś— Contains hyphen
client.create("my.queue")         # âś— Contains period
client.create("a" * 48)          # âś— Too long (48+ chars)
# Raises PGMQ::Errors::InvalidQueueNameError

Sending Messages

# Send single message (must be JSON string)
msg_id = client.send("queue_name", '{"data":"value"}')

# Send with delay (seconds)
msg_id = client.send("queue_name", '{"data":"value"}', delay: 60)

# Send batch (array of JSON strings)
msg_ids = client.send_batch("queue_name", [
  '{"order":1}',
  '{"order":2}',
  '{"order":3}'
])
# => ["101", "102", "103"]

Reading Messages

# Read single message
msg = client.read("queue_name", vt: 30)
# => #<PGMQ::Message msg_id="1" message="{...}">

# Read batch
messages = client.read_batch("queue_name", vt: 30, qty: 10)

# Read with long-polling
msg = client.read_with_poll("queue_name",
  vt: 30,
  qty: 1,
  max_poll_seconds: 5,
  poll_interval_ms: 100
)

# Pop (atomic read + delete)
msg = client.pop("queue_name")

Conditional Message Filtering

Filter messages by JSON payload content using server-side JSONB queries:

# Filter by single condition
msg = client.read("orders", vt: 30, conditional: { status: "pending" })

# Filter by multiple conditions (AND logic)
msg = client.read("orders", vt: 30, conditional: {
  status: "pending",
  priority: "high"
})

# Filter by nested properties
msg = client.read("orders", vt: 30, conditional: {
  user: { role: "admin" }
})

# Works with read_batch
messages = client.read_batch("orders",
  vt: 30,
  qty: 10,
  conditional: { type: "priority" }
)

# Works with long-polling
messages = client.read_with_poll("orders",
  vt: 30,
  max_poll_seconds: 5,
  conditional: { status: "ready" }
)

How Filtering Works:

  • Filtering happens in PostgreSQL using JSONB containment operator (@>)
  • Only messages matching ALL conditions are returned (AND logic)
  • The qty parameter applies after filtering
  • Empty conditions {} means no filtering (same as omitting parameter)

Performance Tip: For frequently filtered fields, add JSONB indexes:

CREATE INDEX idx_orders_status
  ON pgmq.q_orders USING gin ((message->'status'));

Message Lifecycle

# Delete message
client.delete("queue_name", msg_id)

# Delete batch
deleted_ids = client.delete_batch("queue_name", [101, 102, 103])

# Archive message
client.archive("queue_name", msg_id)

# Archive batch
archived_ids = client.archive_batch("queue_name", [101, 102, 103])

# Update visibility timeout
msg = client.set_vt("queue_name", msg_id, vt_offset: 60)

# Purge all messages
count = client.purge_queue("queue_name")

Monitoring

# Get queue metrics
metrics = client.metrics("queue_name")
puts metrics.queue_length        # => 42
puts metrics.oldest_msg_age_sec  # => 120
puts metrics.newest_msg_age_sec  # => 5
puts metrics.total_messages      # => 1000

# Get all queue metrics
all_metrics = client.metrics_all
all_metrics.each do |m|
  puts "#{m.queue_name}: #{m.queue_length} messages"
end

Transaction Support

Low-level PostgreSQL transaction support for atomic operations. Transactions are a database primitive provided by PostgreSQL - this is a thin wrapper for convenience.

Execute atomic operations across multiple queues or combine queue operations with application data updates:

# Atomic operations across multiple queues
client.transaction do |txn|
  # Send to multiple queues atomically
  txn.send("orders", '{"order_id":123}')
  txn.send("notifications", '{"user_id":456,"type":"order_created"}')
  txn.send("analytics", '{"event":"order_placed"}')
end

# Process message and update application state atomically
client.transaction do |txn|
  # Read and process message
  msg = txn.read("orders", vt: 30)

  if msg
    # Parse and update your database
    data = JSON.parse(msg.message)
    Order.create!(external_id: data["order_id"])

    # Delete message only if database update succeeds
    txn.delete("orders", msg.msg_id)
  end
end

# Automatic rollback on errors
client.transaction do |txn|
  txn.send("queue1", '{"data":"message1"}')
  txn.send("queue2", '{"data":"message2"}')

  raise "Something went wrong!"
  # Both messages are rolled back - neither queue receives anything
end

# Move messages between queues atomically
client.transaction do |txn|
  msg = txn.read("pending_orders", vt: 30)

  if msg
    data = JSON.parse(msg.message)
    if data["priority"] == "high"
      # Move to high-priority queue
      txn.send("priority_orders", msg.message)
      txn.delete("pending_orders", msg.msg_id)
    end
  end
end

How Transactions Work:

  • Wraps PostgreSQL's native transaction support (similar to rdkafka-ruby providing Kafka transactions)
  • All operations within the block execute in a single PostgreSQL transaction
  • If any operation fails, the entire transaction is rolled back automatically
  • The transactional client delegates all PGMQ::Client methods for convenience

Use Cases:

  • Multi-queue coordination: Send related messages to multiple queues atomically
  • Exactly-once processing: Combine message deletion with application state updates
  • Message routing: Move messages between queues without losing data
  • Batch operations: Ensure all-or-nothing semantics for bulk operations

Important Notes:

  • Transactions hold database locks - keep them short to avoid blocking
  • Long transactions can impact queue throughput
  • Read operations with long visibility timeouts may cause lock contention
  • Consider using pop() for atomic read+delete in simple cases

Message Object

PGMQ-Ruby is a low-level transport library - it returns raw values from PostgreSQL without any transformation. You are responsible for parsing JSON and type conversion.

msg = client.read("queue", vt: 30)

# All values are strings as returned by PostgreSQL
msg.msg_id          # => "123" (String, not Integer)
msg.id              # => "123" (alias for msg_id)
msg.read_ct         # => "1" (String, not Integer)
msg.enqueued_at     # => "2025-01-15 10:30:00+00" (String, not Time)
msg.vt              # => "2025-01-15 10:30:30+00" (String, not Time)
msg.message         # => "{\"data\":\"value\"}" (Raw JSONB as JSON string)
msg.headers         # => "{\"trace_id\":\"abc123\"}" (Raw JSONB as JSON string, optional)
msg.queue_name      # => "my_queue" (only present for multi-queue operations, otherwise nil)

# You handle JSON parsing
data = JSON.parse(msg.message)  # => { "data" => "value" }
metadata = JSON.parse(msg.headers) if msg.headers  # => { "trace_id" => "abc123" }

# You handle type conversion if needed
id = msg.msg_id.to_i           # => 123
read_count = msg.read_ct.to_i  # => 1
enqueued = Time.parse(msg.enqueued_at)  # => 2025-01-15 10:30:00 UTC

Message Headers

PGMQ supports optional message headers via the headers JSONB column:

# Sending with headers requires direct SQL or a custom wrapper
# (pgmq-ruby focuses on the core PGMQ API which doesn't have a send_with_headers function)

# Reading messages with headers
msg = client.read("queue", vt: 30)
if msg.headers
  metadata = JSON.parse(msg.headers)
  trace_id = metadata["trace_id"]
  correlation_id = metadata["correlation_id"]
end

Why Raw Values?

This library follows the rdkafka-ruby philosophy - provide a thin, performant wrapper around the underlying system:

  1. No assumptions - Your application decides how to parse timestamps, convert types, etc.
  2. Framework-agnostic - Works equally well with Rails, Sinatra, or plain Ruby
  3. Zero overhead - No hidden type conversion or object allocation
  4. Explicit control - You see exactly what PostgreSQL returns

Higher-level features (automatic deserialization, type conversion, instrumentation) belong in framework layers built on top of this library.

Working with JSON

PGMQ stores messages as JSONB in PostgreSQL. You must handle JSON serialization yourself:

Sending Messages

# Simple hash
msg = { order_id: 123, status: "pending" }
client.send("orders", msg.to_json)

# Using JSON.generate for explicit control
client.send("orders", JSON.generate(order_id: 123, status: "pending"))

# Pre-serialized JSON string
json_str = '{"order_id":123,"status":"pending"}'
client.send("orders", json_str)

Reading Messages

msg = client.read("orders", vt: 30)

# Parse JSON yourself
data = JSON.parse(msg.message)
puts data["order_id"]  # => 123
puts data["status"]    # => "pending"

# Handle parsing errors
begin
  data = JSON.parse(msg.message)
rescue JSON::ParserError => e
  logger.error "Invalid JSON in message #{msg.msg_id}: #{e.message}"
  client.delete("orders", msg.msg_id)  # Remove invalid message
end

Helper Pattern (Optional)

For convenience, you can wrap the client in your own helper:

class QueueHelper
  def initialize(client)
    @client = client
  end

  def send(queue, data)
    @client.send(queue, data.to_json)
  end

  def read(queue, vt:)
    msg = @client.read(queue, vt: vt)
    return nil unless msg

    OpenStruct.new(
      id: msg.msg_id.to_i,
      data: JSON.parse(msg.message),
      read_count: msg.read_ct.to_i,
      raw: msg
    )
  end
end

helper = QueueHelper.new(client)
helper.send("orders", { order_id: 123 })
msg = helper.read("orders", vt: 30)
puts msg.data["order_id"]  # => 123

Development

# Clone repository
git clone https://github.com/mensfeld/pgmq-ruby.git
cd pgmq-ruby

# Install dependencies
bundle install

# Start PostgreSQL with PGMQ
docker compose up -d

# Run tests
bundle exec rspec

# Run console
bundle exec bin/console

Author

Maintained by Maciej Mensfeld

Also check out Karafka - High-performance Apache Kafka framework for Ruby.

About

Postgres Message Queue (PGMQ) Ruby and Rails Client Library

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 2

  •  
  •  

Languages