Skip to content
This repository has been archived by the owner on Nov 27, 2023. It is now read-only.

Commit

Permalink
Support publisher confirmations (#110)
Browse files Browse the repository at this point in the history
[Publisher confirmations](https://www.rabbitmq.com/confirms.html#publisher-confirms), is
a mechanism to ensure that a published message actually reached a broker.

Extend, publisher configuration with two optional attributes:
* `activate_confirmations` - when set to `true`, confirmations will be activated on the channel during publisher setup + confirmation will be awaited on every publish
* `max_confirmation_wait_time` - maximum time in seconds, that publisher will wait for a confirmation from a broker before timeouting
  • Loading branch information
mkorszun authored Sep 23, 2019
1 parent fb4cc44 commit 3a57e90
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 29 deletions.
42 changes: 36 additions & 6 deletions lib/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,26 @@ defmodule GenRMQ.Publisher do
`uri` - RabbitMQ uri
`exchange` - the target exchange. If does not exist, it will be created.
`exchange` - name or `{type, name}` of the target exchange. If it does not exist, it will be created.
For valid exchange types see `GenRMQ.Binding`.
### Optional:
`app_id` - publishing application ID
`enable_confirmations` - activates publishing confirmations on the channel. Confirmations are disabled by default.
`max_confirmation_wait_time` - maximum time in milliseconds to wait for a confirmation. By default it is 5_000 (5s).
## Examples:
```
def init() do
[
exchange: "gen_rmq_exchange",
uri: "amqp://guest:guest@localhost:5672"
app_id: :my_app_id
app_id: :my_app_id,
enable_confirmations: true,
max_confirmation_wait_time: 5_000
]
end
```
Expand All @@ -47,7 +54,9 @@ defmodule GenRMQ.Publisher do
@callback init() :: [
exchange: GenRMQ.Binding.exchange(),
uri: String.t(),
app_id: atom
app_id: atom,
enable_confirmations: boolean,
max_confirmation_wait_time: integer
]

##############################################################################
Expand Down Expand Up @@ -106,7 +115,7 @@ defmodule GenRMQ.Publisher do
message :: String.t(),
routing_key :: String.t(),
metadata :: Keyword.t()
) :: :ok | {:error, reason :: :blocked | :closing}
) :: :ok | {:ok, :confirmed} | {:error, reason :: :blocked | :closing | :confirmation_timeout}
def publish(publisher, message, routing_key \\ "", metadata \\ []) do
GenServer.call(publisher, {:publish, message, routing_key, metadata})
end
Expand All @@ -129,8 +138,9 @@ defmodule GenRMQ.Publisher do
@impl GenServer
def handle_call({:publish, msg, key, metadata}, _from, %{channel: channel, config: config} = state) do
metadata = config |> base_metadata() |> merge_metadata(metadata)
result = Basic.publish(channel, GenRMQ.Binding.exchange_name(config[:exchange]), key, msg, metadata)
{:reply, result, state}
publish_result = Basic.publish(channel, GenRMQ.Binding.exchange_name(config[:exchange]), key, msg, metadata)
confirmation_result = wait_for_confirmation(channel, config)
{:reply, publish_result(publish_result, confirmation_result), state}
end

@doc false
Expand Down Expand Up @@ -165,9 +175,29 @@ defmodule GenRMQ.Publisher do
{:ok, conn} = connect(state)
{:ok, channel} = Channel.open(conn)
GenRMQ.Binding.declare_exchange(channel, config[:exchange])

with_confirmations = Keyword.get(config, :enable_confirmations, false)
:ok = activate_confirmations(channel, with_confirmations)
{:ok, %{channel: channel, module: module, config: config, conn: conn}}
end

defp activate_confirmations(_, false), do: :ok
defp activate_confirmations(channel, true), do: AMQP.Confirm.select(channel)

defp wait_for_confirmation(channel, config) do
with_confirmations = Keyword.get(config, :enable_confirmations, false)
max_wait_time = config |> Keyword.get(:max_confirmation_wait_time, 5_000)
wait_for_confirmation(channel, with_confirmations, max_wait_time)
end

defp wait_for_confirmation(_, false, _), do: :confirmation_disabled
defp wait_for_confirmation(channel, true, max_wait_time), do: AMQP.Confirm.wait_for_confirms(channel, max_wait_time)

defp publish_result(:ok, :confirmation_disabled), do: :ok
defp publish_result(:ok, true = _confirmed), do: {:ok, :confirmed}
defp publish_result(:ok, :timeout), do: {:error, :confirmation_timeout}
defp publish_result(error, _), do: error

defp connect(%{module: module, config: config} = state) do
case Connection.open(config[:uri]) do
{:ok, conn} ->
Expand Down
52 changes: 29 additions & 23 deletions test/gen_rmq_publisher_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,13 @@ defmodule GenRMQ.PublisherTest do
alias GenRMQ.Publisher
alias GenRMQ.Test.Assert

alias TestPublisher.Default
alias TestPublisher.WithConfirmations

@uri "amqp://guest:guest@localhost:5672"
@exchange "gen_rmq_out_exchange"
@out_queue "gen_rmq_out_queue"

defmodule TestPublisher do
@behaviour GenRMQ.Publisher

def init() do
[
exchange: "gen_rmq_out_exchange",
uri: "amqp://guest:guest@localhost:5672",
app_id: :my_app_id
]
end
end

setup_all do
{:ok, conn} = rmq_open(@uri)
:ok = setup_out_queue(conn, @out_queue, @exchange)
Expand All @@ -33,25 +24,25 @@ defmodule GenRMQ.PublisherTest do

describe "start_link/2" do
test "should start a new publisher" do
{:ok, pid} = GenRMQ.Publisher.start_link(TestPublisher)
{:ok, pid} = GenRMQ.Publisher.start_link(Default)
assert Process.alive?(pid)
end

test "should start a new publisher registered by name" do
{:ok, pid} = GenRMQ.Publisher.start_link(TestPublisher, name: TestPublisher)
assert Process.whereis(TestPublisher) == pid
{:ok, pid} = GenRMQ.Publisher.start_link(Default, name: Default)
assert Process.whereis(Default) == pid
end
end

describe "GenRMQ.Publisher" do
describe "TestPublisher.Default" do
setup do
with_test_publisher()
with_test_publisher(Default)
end

test "should publish message", %{publisher: publisher_pid} = context do
message = %{"msg" => "msg"}

GenRMQ.Publisher.publish(publisher_pid, Jason.encode!(%{"msg" => "msg"}))
:ok = GenRMQ.Publisher.publish(publisher_pid, Jason.encode!(%{"msg" => "msg"}))

Assert.repeatedly(fn -> assert out_queue_count(context) >= 1 end)
{:ok, received_message, meta} = get_message_from_queue(context)
Expand All @@ -77,7 +68,7 @@ defmodule GenRMQ.PublisherTest do
test "should publish message with headers", %{publisher: publisher_pid} = context do
message = %{"msg" => "msg"}

GenRMQ.Publisher.publish(publisher_pid, Jason.encode!(message), "some.routing.key", header1: "value")
:ok = GenRMQ.Publisher.publish(publisher_pid, Jason.encode!(message), "some.routing.key", header1: "value")

Assert.repeatedly(fn -> assert out_queue_count(context) >= 1 end)
{:ok, received_message, meta} = get_message_from_queue(context)
Expand All @@ -89,7 +80,7 @@ defmodule GenRMQ.PublisherTest do
test "should override standard metadata fields from headers", %{publisher: publisher_pid} = context do
message = %{"msg" => "msg"}

GenRMQ.Publisher.publish(
:ok = GenRMQ.Publisher.publish(
publisher_pid,
Jason.encode!(message),
"some.routing.key",
Expand All @@ -112,7 +103,7 @@ defmodule GenRMQ.PublisherTest do
test "should publish a message with priority", %{publisher: publisher_pid} = context do
message = %{"msg" => "with prio"}

GenRMQ.Publisher.publish(
:ok = GenRMQ.Publisher.publish(
publisher_pid,
Jason.encode!(message),
"some.routing.key",
Expand All @@ -136,7 +127,7 @@ defmodule GenRMQ.PublisherTest do
assert new_state.channel.conn.pid != state.channel.conn.pid
end)

GenRMQ.Publisher.publish(publisher_pid, Jason.encode!(message))
:ok = GenRMQ.Publisher.publish(publisher_pid, Jason.encode!(message))

Assert.repeatedly(fn -> assert out_queue_count(context) >= 1 end)
{:ok, received_message, meta} = get_message_from_queue(context)
Expand All @@ -156,7 +147,22 @@ defmodule GenRMQ.PublisherTest do
end
end

defp with_test_publisher(module \\ TestPublisher) do
describe "TestPublisher.WithConfirmations" do
setup do
with_test_publisher(WithConfirmations)
end

test "should publish a message and wait for a confirmation", %{publisher: publisher_pid} = context do
message = %{"msg" => "with confirmation"}
publish_result = GenRMQ.Publisher.publish(publisher_pid, Jason.encode!(message), "some.routing.key")

Assert.repeatedly(fn -> assert out_queue_count(context) >= 1 end)
assert match?({:ok, ^message, _}, get_message_from_queue(context))
assert {:ok, :confirmed} == publish_result
end
end

defp with_test_publisher(module) do
Process.flag(:trap_exit, true)
{:ok, publisher_pid} = Publisher.start_link(module)

Expand Down
28 changes: 28 additions & 0 deletions test/support/test_publishers.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
defmodule TestPublisher do
defmodule Default do
@moduledoc false
@behaviour GenRMQ.Publisher

def init() do
[
exchange: "gen_rmq_out_exchange",
uri: "amqp://guest:guest@localhost:5672",
app_id: :my_app_id
]
end
end

defmodule WithConfirmations do
@moduledoc false
@behaviour GenRMQ.Publisher

def init() do
[
exchange: "gen_rmq_out_exchange",
uri: "amqp://guest:guest@localhost:5672",
app_id: :my_app_id,
enable_confirmations: true
]
end
end
end

0 comments on commit 3a57e90

Please sign in to comment.