Skip to content
This repository has been archived by the owner on Nov 27, 2023. It is now read-only.

Task Supervisor for message consumers #179

Merged
merged 19 commits into from
May 25, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ elixir:
- 1.7
- 1.8
- 1.9
- 1.10

otp_release:
- 20.0
- 21.0
mkorszun marked this conversation as resolved.
Show resolved Hide resolved

sudo: required
Expand Down
75 changes: 50 additions & 25 deletions lib/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -263,29 +263,28 @@ defmodule GenRMQ.Consumer do
|> Map.put(:config, parsed_config)
|> Map.put(:reconnect_attempt, 0)

send(self(), :init)

{:ok, state}
end

@doc false
@impl GenServer
def handle_call({:recover, requeue}, _from, %{in: channel} = state) do
{:reply, Basic.recover(channel, requeue: requeue), state}
{:ok, state, {:continue, :init}}
end

@doc false
@impl GenServer
def handle_info(:init, state) do
def handle_continue(:init, state) do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lovely!

state =
state
|> get_connection()
|> open_channels()
|> setup_consumer()
|> setup_task_supervisor()

{:noreply, state}
end

@doc false
@impl GenServer
def handle_call({:recover, requeue}, _from, %{in: channel} = state) do
{:reply, Basic.recover(channel, requeue: requeue), state}
end

@doc false
@impl GenServer
def handle_info({:DOWN, _ref, :process, _pid, reason}, %{module: module, config: config} = state) do
Expand All @@ -298,6 +297,20 @@ defmodule GenRMQ.Consumer do
|> handle_reconnect(state)
end

@doc false
@impl GenServer
def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do
mkorszun marked this conversation as resolved.
Show resolved Hide resolved
{:noreply, state}
end

@doc false
@impl GenServer
def handle_info({ref, _task_result}, state) when is_reference(ref) do
Process.demonitor(ref, [:flush])

{:noreply, state}
end

@doc false
@impl GenServer
def handle_info({:basic_consume_ok, %{consumer_tag: consumer_tag}}, %{module: module} = state) do
Expand All @@ -321,15 +334,15 @@ defmodule GenRMQ.Consumer do

@doc false
@impl GenServer
def handle_info({:basic_deliver, payload, attributes}, %{module: module, config: config} = state) do
def handle_info({:basic_deliver, payload, attributes}, %{module: module} = state) do
%{delivery_tag: tag, routing_key: routing_key, redelivered: redelivered} = attributes
Logger.debug("[#{module}]: Received message. Tag: #{tag}, routing key: #{routing_key}, redelivered: #{redelivered}")

if redelivered do
Logger.debug("[#{module}]: Redelivered payload for message. Tag: #{tag}, payload: #{payload}")
end

handle_message(payload, attributes, state, Keyword.get(config, :concurrency, true))
handle_message(payload, attributes, state)

{:noreply, state}
end
Expand Down Expand Up @@ -374,19 +387,9 @@ defmodule GenRMQ.Consumer do
|> Keyword.put(:connection, Keyword.get(config, :connection, config[:uri]))
end

defp handle_message(payload, attributes, %{module: module} = state, false) do
start_time = System.monotonic_time()
message = Message.create(attributes, payload, state)

emit_message_start_event(start_time, message, module)
result = apply(module, :handle_message, [message])
emit_message_stop_event(start_time, message, module)

result
end

defp handle_message(payload, attributes, %{module: module} = state, true) do
spawn(fn ->
defp handle_message(payload, attributes, %{module: module, task_supervisor: task_supervisor_pid} = state)
when is_pid(task_supervisor_pid) do
Task.Supervisor.async_nolink(task_supervisor_pid, fn ->
start_time = System.monotonic_time()
message = Message.create(attributes, payload, state)

Expand All @@ -398,6 +401,17 @@ defmodule GenRMQ.Consumer do
end)
end

defp handle_message(payload, attributes, %{module: module} = state) do
start_time = System.monotonic_time()
message = Message.create(attributes, payload, state)

emit_message_start_event(start_time, message, module)
result = apply(module, :handle_message, [message])
emit_message_stop_event(start_time, message, module)

result
end

defp handle_reconnect(false, %{module: module} = state) do
Logger.info("[#{module}]: Reconnection is disabled. Terminating consumer.")
{:stop, :connection_closed, state}
Expand All @@ -410,6 +424,7 @@ defmodule GenRMQ.Consumer do
|> get_connection()
|> open_channels()
|> setup_consumer()
|> setup_task_supervisor()

{:noreply, new_state}
end
Expand Down Expand Up @@ -452,6 +467,16 @@ defmodule GenRMQ.Consumer do
Map.merge(state, %{in: chan, out: out_chan})
end

defp setup_task_supervisor(%{config: config} = state) do
if Keyword.get(config, :concurrency, true) do
{:ok, pid} = Task.Supervisor.start_link()

Map.put(state, :task_supervisor, pid)
else
Map.put(state, :task_supervisor, nil)
end
end

defp setup_consumer(%{in: chan, config: config, module: module} = state) do
queue_config = config[:queue]
prefetch_count = String.to_integer(config[:prefetch_count])
Expand Down
21 changes: 11 additions & 10 deletions lib/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,17 @@ defmodule GenRMQ.Publisher do
Process.flag(:trap_exit, true)
config = apply(module, :init, [])
state = Map.merge(initial_state, %{config: config})
send(self(), :init)
{:ok, state}

{:ok, state, {:continue, :init}}
end

@doc false
@impl GenServer
def handle_continue(:init, %{module: module, config: config}) do
Logger.info("[#{module}]: Setting up publisher connection and configuration")
{:ok, state} = setup_publisher(%{module: module, config: config})

{:noreply, state}
end

@doc false
Expand Down Expand Up @@ -279,14 +288,6 @@ defmodule GenRMQ.Publisher do
{:reply, result, state}
end

@doc false
@impl GenServer
def handle_info(:init, %{module: module, config: config}) do
Logger.info("[#{module}]: Setting up publisher connection and configuration")
{:ok, state} = setup_publisher(%{module: module, config: config})
{:noreply, state}
end

@doc false
@impl GenServer
def handle_info({:DOWN, _ref, :process, _pid, reason}, %{module: module, config: config}) do
Expand Down