Ruby client for PGMQ - PostgreSQL Message Queue
PGMQ-Ruby is a Ruby client for PGMQ (PostgreSQL Message Queue). It provides direct access to all PGMQ operations with a clean, minimal API - similar to how rdkafka-ruby relates to Kafka.
Think of it as:
- Like AWS SQS - but running entirely in PostgreSQL with no external dependencies
- Like Sidekiq/Resque - but without Redis, using PostgreSQL for both data and queues
- Like rdkafka-ruby - a thin, efficient wrapper around the underlying system (PGMQ SQL functions)
Architecture Note: This library follows the rdkafka-ruby/Karafka pattern -
pgmq-rubyis the low-level foundation, while higher-level features (job processing, Rails integration, retry strategies) will live inpgmq-framework(similar to how Karafka builds on rdkafka-ruby).
- PGMQ Feature Support
- Requirements
- Installation
- Quick Start
- Configuration
- API Reference
- Message Object
- Serializers
- Rails Integration
- Development
- License
- Author
This gem provides complete support for all core PGMQ SQL functions. Based on the official PGMQ API:
| Category | Method | Description | Status |
|---|---|---|---|
| Sending | send |
Send single message with optional delay | âś… |
send_batch |
Send multiple messages atomically | âś… | |
| Reading | read |
Read single message with visibility timeout | âś… |
read_batch |
Read multiple messages with visibility timeout | âś… | |
read_with_poll |
Long-polling for efficient message consumption | âś… | |
pop |
Atomic read + delete operation | âś… | |
| Deleting/Archiving | delete |
Delete single message | âś… |
delete_batch |
Delete multiple messages | âś… | |
archive |
Archive single message for long-term storage | âś… | |
archive_batch |
Archive multiple messages | âś… | |
purge_queue |
Remove all messages from queue | âś… | |
| Queue Management | create |
Create standard queue | âś… |
create_partitioned |
Create partitioned queue (requires pg_partman) | âś… | |
create_unlogged |
Create unlogged queue (faster, no crash recovery) | âś… | |
drop_queue |
Delete queue and all messages | âś… | |
detach_archive |
Detach archive table from queue | âś… | |
| Utilities | set_vt |
Update message visibility timeout | âś… |
list_queues |
List all queues with metadata | âś… | |
metrics |
Get queue metrics (length, age, total messages) | âś… | |
metrics_all |
Get metrics for all queues | âś… | |
| Ruby Enhancements | Transaction Support | Atomic operations via client.transaction do |txn| |
âś… |
| Conditional Filtering | Server-side JSONB filtering with conditional: |
âś… | |
| Multi-Queue Ops | Read/pop/delete/archive from multiple queues | âś… | |
| Queue Validation | 48-character limit and name validation | âś… | |
| Connection Pooling | Thread-safe connection pool for concurrency | âś… | |
| Pluggable Serializers | JSON (default) with custom serializer support | âś… |
- Ruby 3.2+
- PostgreSQL 14-18 with PGMQ extension installed
Add to your Gemfile:
gem 'pgmq-ruby'Or install directly:
gem install pgmq-rubyrequire 'pgmq'
# Connect to database
client = PGMQ::Client.new(
host: 'localhost',
port: 5432,
dbname: 'mydb',
user: 'postgres',
password: 'secret'
)
# Create a queue
client.create('orders')
# Send a message (must be JSON string)
msg_id = client.send('orders', '{"order_id":123,"total":99.99}')
# Read a message (30 second visibility timeout)
msg = client.read('orders', vt: 30)
puts msg.message # => "{\"order_id\":123,\"total\":99.99}" (raw JSON string)
# Parse and process (you handle deserialization)
data = JSON.parse(msg.message)
process_order(data)
client.delete('orders', msg.msg_id)
# Or archive for long-term storage
client.archive('orders', msg.msg_id)
# Clean up
client.drop_queue('orders')
client.close# config/initializers/pgmq.rb or in your model
class OrderProcessor
def initialize
# Reuse Rails' connection pool - no separate connection needed!
@client = PGMQ::Client.new(-> { ActiveRecord::Base.connection.raw_connection })
end
def process_orders
loop do
msg = @client.read('orders', vt: 30)
break unless msg
# Parse JSON yourself
data = JSON.parse(msg.message)
process_order(data)
@client.delete('orders', msg.msg_id)
end
end
endPGMQ-Ruby supports multiple ways to connect:
client = PGMQ::Client.new(
host: 'localhost',
port: 5432,
dbname: 'mydb',
user: 'postgres',
password: 'secret',
pool_size: 5, # Default: 5
pool_timeout: 5 # Default: 5 seconds
)client = PGMQ::Client.new('postgres://user:pass@localhost:5432/dbname')# Reuses Rails connection pool - no additional connections needed
client = PGMQ::Client.new(-> { ActiveRecord::Base.connection.raw_connection })# Bring your own connection management
connection = PGMQ::Connection.new('postgres://localhost/mydb', pool_size: 10)
client = PGMQ::Client.new(connection)PGMQ-Ruby includes connection pooling with resilience:
# Configure pool size and timeouts
client = PGMQ::Client.new(
'postgres://localhost/mydb',
pool_size: 10, # Number of connections (default: 5)
pool_timeout: 5, # Timeout in seconds (default: 5)
auto_reconnect: true # Auto-reconnect on connection loss (default: true)
)
# Monitor connection pool health
stats = client.stats
puts "Pool size: #{stats[:size]}" # => 10
puts "Available: #{stats[:available]}" # => 8 (2 in use)
# Disable auto-reconnect if you prefer explicit error handling
client = PGMQ::Client.new(
'postgres://localhost/mydb',
auto_reconnect: false
)Connection Pool Benefits:
- Thread-safe - Multiple threads can safely share a single client
- Fiber-aware - Works with Ruby 3.0+ Fiber Scheduler for non-blocking I/O
- Auto-reconnect - Recovers from lost connections (configurable)
- Health checks - Verifies connections before use to prevent stale connection errors
- Monitoring - Track pool utilization with
client.stats
# Create a queue
client.create("queue_name")
# Create partitioned queue (requires pg_partman)
client.create_partitioned("queue_name",
partition_interval: "daily",
retention_interval: "7 days"
)
# Create unlogged queue (faster, no crash recovery)
client.create_unlogged("queue_name")
# Drop queue
client.drop_queue("queue_name")
# List all queues
queues = client.list_queues
# => [#<PGMQ::QueueMetadata queue_name="orders" created_at=...>, ...]Queue names must follow PostgreSQL identifier rules with PGMQ-specific constraints:
- Maximum 48 characters (PGMQ enforces this limit for table prefixes)
- Must start with a letter or underscore
- Can contain only letters, digits, and underscores
- Case-sensitive
Valid Queue Names:
client.create("orders") # âś“ Simple name
client.create("high_priority") # âś“ With underscore
client.create("Queue123") # âś“ With numbers
client.create("_internal") # âś“ Starts with underscore
client.create("a" * 47) # âś“ Maximum length (47 chars)Invalid Queue Names:
client.create("123orders") # âś— Starts with number
client.create("my-queue") # âś— Contains hyphen
client.create("my.queue") # âś— Contains period
client.create("a" * 48) # âś— Too long (48+ chars)
# Raises PGMQ::Errors::InvalidQueueNameError# Send single message (must be JSON string)
msg_id = client.send("queue_name", '{"data":"value"}')
# Send with delay (seconds)
msg_id = client.send("queue_name", '{"data":"value"}', delay: 60)
# Send batch (array of JSON strings)
msg_ids = client.send_batch("queue_name", [
'{"order":1}',
'{"order":2}',
'{"order":3}'
])
# => ["101", "102", "103"]# Read single message
msg = client.read("queue_name", vt: 30)
# => #<PGMQ::Message msg_id="1" message="{...}">
# Read batch
messages = client.read_batch("queue_name", vt: 30, qty: 10)
# Read with long-polling
msg = client.read_with_poll("queue_name",
vt: 30,
qty: 1,
max_poll_seconds: 5,
poll_interval_ms: 100
)
# Pop (atomic read + delete)
msg = client.pop("queue_name")Filter messages by JSON payload content using server-side JSONB queries:
# Filter by single condition
msg = client.read("orders", vt: 30, conditional: { status: "pending" })
# Filter by multiple conditions (AND logic)
msg = client.read("orders", vt: 30, conditional: {
status: "pending",
priority: "high"
})
# Filter by nested properties
msg = client.read("orders", vt: 30, conditional: {
user: { role: "admin" }
})
# Works with read_batch
messages = client.read_batch("orders",
vt: 30,
qty: 10,
conditional: { type: "priority" }
)
# Works with long-polling
messages = client.read_with_poll("orders",
vt: 30,
max_poll_seconds: 5,
conditional: { status: "ready" }
)How Filtering Works:
- Filtering happens in PostgreSQL using JSONB containment operator (
@>) - Only messages matching ALL conditions are returned (AND logic)
- The
qtyparameter applies after filtering - Empty conditions
{}means no filtering (same as omitting parameter)
Performance Tip: For frequently filtered fields, add JSONB indexes:
CREATE INDEX idx_orders_status
ON pgmq.q_orders USING gin ((message->'status'));# Delete message
client.delete("queue_name", msg_id)
# Delete batch
deleted_ids = client.delete_batch("queue_name", [101, 102, 103])
# Archive message
client.archive("queue_name", msg_id)
# Archive batch
archived_ids = client.archive_batch("queue_name", [101, 102, 103])
# Update visibility timeout
msg = client.set_vt("queue_name", msg_id, vt_offset: 60)
# Purge all messages
count = client.purge_queue("queue_name")# Get queue metrics
metrics = client.metrics("queue_name")
puts metrics.queue_length # => 42
puts metrics.oldest_msg_age_sec # => 120
puts metrics.newest_msg_age_sec # => 5
puts metrics.total_messages # => 1000
# Get all queue metrics
all_metrics = client.metrics_all
all_metrics.each do |m|
puts "#{m.queue_name}: #{m.queue_length} messages"
endLow-level PostgreSQL transaction support for atomic operations. Transactions are a database primitive provided by PostgreSQL - this is a thin wrapper for convenience.
Execute atomic operations across multiple queues or combine queue operations with application data updates:
# Atomic operations across multiple queues
client.transaction do |txn|
# Send to multiple queues atomically
txn.send("orders", '{"order_id":123}')
txn.send("notifications", '{"user_id":456,"type":"order_created"}')
txn.send("analytics", '{"event":"order_placed"}')
end
# Process message and update application state atomically
client.transaction do |txn|
# Read and process message
msg = txn.read("orders", vt: 30)
if msg
# Parse and update your database
data = JSON.parse(msg.message)
Order.create!(external_id: data["order_id"])
# Delete message only if database update succeeds
txn.delete("orders", msg.msg_id)
end
end
# Automatic rollback on errors
client.transaction do |txn|
txn.send("queue1", '{"data":"message1"}')
txn.send("queue2", '{"data":"message2"}')
raise "Something went wrong!"
# Both messages are rolled back - neither queue receives anything
end
# Move messages between queues atomically
client.transaction do |txn|
msg = txn.read("pending_orders", vt: 30)
if msg
data = JSON.parse(msg.message)
if data["priority"] == "high"
# Move to high-priority queue
txn.send("priority_orders", msg.message)
txn.delete("pending_orders", msg.msg_id)
end
end
endHow Transactions Work:
- Wraps PostgreSQL's native transaction support (similar to rdkafka-ruby providing Kafka transactions)
- All operations within the block execute in a single PostgreSQL transaction
- If any operation fails, the entire transaction is rolled back automatically
- The transactional client delegates all
PGMQ::Clientmethods for convenience
Use Cases:
- Multi-queue coordination: Send related messages to multiple queues atomically
- Exactly-once processing: Combine message deletion with application state updates
- Message routing: Move messages between queues without losing data
- Batch operations: Ensure all-or-nothing semantics for bulk operations
Important Notes:
- Transactions hold database locks - keep them short to avoid blocking
- Long transactions can impact queue throughput
- Read operations with long visibility timeouts may cause lock contention
- Consider using
pop()for atomic read+delete in simple cases
PGMQ-Ruby is a low-level transport library - it returns raw values from PostgreSQL without any transformation. You are responsible for parsing JSON and type conversion.
msg = client.read("queue", vt: 30)
# All values are strings as returned by PostgreSQL
msg.msg_id # => "123" (String, not Integer)
msg.id # => "123" (alias for msg_id)
msg.read_ct # => "1" (String, not Integer)
msg.enqueued_at # => "2025-01-15 10:30:00+00" (String, not Time)
msg.vt # => "2025-01-15 10:30:30+00" (String, not Time)
msg.message # => "{\"data\":\"value\"}" (Raw JSONB as JSON string)
msg.headers # => "{\"trace_id\":\"abc123\"}" (Raw JSONB as JSON string, optional)
msg.queue_name # => "my_queue" (only present for multi-queue operations, otherwise nil)
# You handle JSON parsing
data = JSON.parse(msg.message) # => { "data" => "value" }
metadata = JSON.parse(msg.headers) if msg.headers # => { "trace_id" => "abc123" }
# You handle type conversion if needed
id = msg.msg_id.to_i # => 123
read_count = msg.read_ct.to_i # => 1
enqueued = Time.parse(msg.enqueued_at) # => 2025-01-15 10:30:00 UTCPGMQ supports optional message headers via the headers JSONB column:
# Sending with headers requires direct SQL or a custom wrapper
# (pgmq-ruby focuses on the core PGMQ API which doesn't have a send_with_headers function)
# Reading messages with headers
msg = client.read("queue", vt: 30)
if msg.headers
metadata = JSON.parse(msg.headers)
trace_id = metadata["trace_id"]
correlation_id = metadata["correlation_id"]
endThis library follows the rdkafka-ruby philosophy - provide a thin, performant wrapper around the underlying system:
- No assumptions - Your application decides how to parse timestamps, convert types, etc.
- Framework-agnostic - Works equally well with Rails, Sinatra, or plain Ruby
- Zero overhead - No hidden type conversion or object allocation
- Explicit control - You see exactly what PostgreSQL returns
Higher-level features (automatic deserialization, type conversion, instrumentation) belong in framework layers built on top of this library.
PGMQ stores messages as JSONB in PostgreSQL. You must handle JSON serialization yourself:
# Simple hash
msg = { order_id: 123, status: "pending" }
client.send("orders", msg.to_json)
# Using JSON.generate for explicit control
client.send("orders", JSON.generate(order_id: 123, status: "pending"))
# Pre-serialized JSON string
json_str = '{"order_id":123,"status":"pending"}'
client.send("orders", json_str)msg = client.read("orders", vt: 30)
# Parse JSON yourself
data = JSON.parse(msg.message)
puts data["order_id"] # => 123
puts data["status"] # => "pending"
# Handle parsing errors
begin
data = JSON.parse(msg.message)
rescue JSON::ParserError => e
logger.error "Invalid JSON in message #{msg.msg_id}: #{e.message}"
client.delete("orders", msg.msg_id) # Remove invalid message
endFor convenience, you can wrap the client in your own helper:
class QueueHelper
def initialize(client)
@client = client
end
def send(queue, data)
@client.send(queue, data.to_json)
end
def read(queue, vt:)
msg = @client.read(queue, vt: vt)
return nil unless msg
OpenStruct.new(
id: msg.msg_id.to_i,
data: JSON.parse(msg.message),
read_count: msg.read_ct.to_i,
raw: msg
)
end
end
helper = QueueHelper.new(client)
helper.send("orders", { order_id: 123 })
msg = helper.read("orders", vt: 30)
puts msg.data["order_id"] # => 123# Clone repository
git clone https://github.com/mensfeld/pgmq-ruby.git
cd pgmq-ruby
# Install dependencies
bundle install
# Start PostgreSQL with PGMQ
docker compose up -d
# Run tests
bundle exec rspec
# Run console
bundle exec bin/consoleMaintained by Maciej Mensfeld
Also check out Karafka - High-performance Apache Kafka framework for Ruby.