Skip to content

Commit

Permalink
Add multiple database support by allowing injection of a different AR…
Browse files Browse the repository at this point in the history
… base class (#32)

Closes #22.
  • Loading branch information
agrobbin authored Mar 21, 2024
1 parent 63a34b8 commit 7a9237e
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# activerecord-postgres_pub_sub

## Unreleased
- Add support for multiple databases by allowing injection of the base Active Record class.
- BREAKING: Drop support for ActiveRecord 5.2, 6.0
- BREAKING: Drop support for ruby < 3.0

Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ When creating a listener, the following configuration is supported:
the listener ignores the payload and coalesces multiple notifications into a
single call. When this option is `false`, the `on_notify` block is called with
the payload for each notification. (Default `true`).
* **base_class**: An Active Record class should you need to use a different base
class (e.g. for multiple database support). (Default `ActiveRecord::Base`).
* **exclusive_lock**: Acquire a lock using
[with_advisory_lock](https://github.com/ClosureTree/with_advisory_lock) prior to listening.
This option ensures that a process as a singleton listener. (Default `true`).
Expand Down
28 changes: 21 additions & 7 deletions lib/activerecord/postgres_pub_sub/listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,36 @@ class Listener
extend PrivateAttr

private_attr_reader :on_notify_blk, :on_start_blk, :on_timeout_blk,
:channels, :listen_timeout, :exclusive_lock, :notify_only

def self.listen(*channels, listen_timeout: nil, exclusive_lock: true, notify_only: true)
:channels, :listen_timeout, :exclusive_lock, :notify_only, :base_class

def self.listen(
*channels,
listen_timeout: nil,
exclusive_lock: true,
notify_only: true,
base_class: ActiveRecord::Base
)
listener = new(*channels,
listen_timeout: listen_timeout,
exclusive_lock: exclusive_lock,
notify_only: notify_only)
notify_only: notify_only,
base_class: base_class)
yield(listener) if block_given?
listener.listen
end

def initialize(*channels, listen_timeout: nil, exclusive_lock: true, notify_only: true)
def initialize(
*channels,
listen_timeout: nil,
exclusive_lock: true,
notify_only: true,
base_class: ActiveRecord::Base
)
@channels = channels
@listen_timeout = listen_timeout
@exclusive_lock = exclusive_lock
@notify_only = notify_only
@base_class = base_class
end

def on_notify(&blk)
Expand Down Expand Up @@ -54,7 +68,7 @@ def listen
private

def with_connection
ActiveRecord::Base.connection_pool.with_connection do |connection|
base_class.connection_pool.with_connection do |connection|
with_optional_lock do
channels.each do |channel|
connection.execute("LISTEN #{channel};")
Expand All @@ -73,7 +87,7 @@ def with_connection

def with_optional_lock(&block)
if exclusive_lock
ActiveRecord::Base.with_advisory_lock(lock_name, &block)
base_class.with_advisory_lock(lock_name, &block)
else
yield
end
Expand Down
37 changes: 30 additions & 7 deletions spec/active_record/postgres_pub_sub/listener_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

RSpec.describe ActiveRecord::PostgresPubSub::Listener, cleaner_strategy: :truncation do
let(:channel) { "pub_sub_test" }
let(:base_class) { ActiveRecord::Base }

describe ".listen" do
let(:listener_options) { Hash.new }
Expand All @@ -16,7 +17,7 @@
Thread.new do
listener_loop(**listener_options)
ensure
ActiveRecord::Base.clear_active_connections!
base_class.clear_active_connections!
end
end

Expand Down Expand Up @@ -93,7 +94,7 @@
end
end

context "when listen to multiple channels" do
context "when listen to multiple channels" do # rubocop:disable RSpec/MultipleMemoizedHelpers
let(:channels) { %w(pub_sub_test1 pub_sub_test2) }
let(:listener_options) { Hash[listen_to: channels, notify_only: false] }

Expand All @@ -110,10 +111,25 @@
end
end

context "when using a custom base class for multiple databases" do
let(:base_class) { OtherApplicationRecord }
let(:listener_options) { Hash[base_class: base_class] }

it "invokes the notify block when it receives a notification" do
wait_for_started

OtherApplicationRecord.transaction do
3.times { |i| notify(i) }
end

wait_for("notification received") { state.count > 0 }
expect(state.payloads).to match_ordered_array([nil])
expect(state.count).to eq(1)
end
end

def notify(payload, notify_to: channel)
# rubocop:disable Ezcater/RailsTopLevelSqlExecute
ActiveRecord::Base.connection.execute("NOTIFY #{notify_to}, '#{payload}'")
# rubocop:enable Ezcater/RailsTopLevelSqlExecute
base_class.connection.execute("NOTIFY #{notify_to}, '#{payload}'")
end

def wait_for_started
Expand All @@ -130,11 +146,18 @@ def wait_for(message, timeout: 5, poll_interval: 0.001)
end
end

def listener_loop(listen_to: [channel], listen_timeout: nil, exclusive_lock: true, notify_only: true)
def listener_loop(
listen_to: [channel],
listen_timeout: nil,
exclusive_lock: true,
notify_only: true,
base_class: ActiveRecord::Base
)
described_class.listen(*listen_to,
listen_timeout: listen_timeout,
exclusive_lock: exclusive_lock,
notify_only: notify_only) do |listener|
notify_only: notify_only,
base_class: base_class) do |listener|
listener.on_start do
state.started += 1
end
Expand Down
31 changes: 23 additions & 8 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,18 @@
require "database_cleaner"
require "ezcater_matchers"

class OtherApplicationRecord < ActiveRecord::Base
self.abstract_class = true
end

RSpec.configure do |config|
host = ENV.fetch("PGHOST", "localhost")
port = ENV.fetch("PGPORT", 5432)
database_name = "postgres_pub_sub_test"

databases = {
"postgres_pub_sub_test" => ActiveRecord::Base,
"postgres_pub_sub_other_test" => OtherApplicationRecord,
}

config.expect_with :rspec do |expectations|
expectations.include_chain_clauses_in_custom_matcher_descriptions = true
Expand All @@ -34,18 +42,25 @@
puts "Testing with Postgres version: #{pg_version}"
puts "Testing with ActiveRecord #{ActiveRecord::VERSION::STRING}"

`dropdb -h #{host} -p #{port} --if-exists #{database_name} 2> /dev/null`
`createdb -h #{host} -p #{port} #{database_name}`
databases.each do |name, base_class|
`dropdb -h #{host} -p #{port} --if-exists #{name} 2> /dev/null`
`createdb -h #{host} -p #{port} #{name}`

database_url = "postgres://#{host}:#{port}/#{name}"

puts "Using database #{database_url}"

base_class.establish_connection(database_url)
end

database_url = "postgres://#{host}:#{port}/#{database_name}"
puts "Using database #{database_url}"
ActiveRecord::Base.establish_connection(database_url)
DatabaseCleaner.clean_with(:truncation)
end

config.after(:suite) do
ActiveRecord::Base.connection_pool.disconnect!
`dropdb -h #{host} -p #{port} --if-exists #{database_name}`
databases.each do |name, base_class|
base_class.connection_pool.disconnect!
`dropdb -h #{host} -p #{port} --if-exists #{name}`
end
end

config.before do |example|
Expand Down

0 comments on commit 7a9237e

Please sign in to comment.