Skip to content
This repository has been archived by the owner on Nov 27, 2023. It is now read-only.

Task Supervisor for message consumers #179

Merged
merged 19 commits into from
May 25, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 48 additions & 18 deletions lib/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ defmodule GenRMQ.Consumer do
* create deadletter queue and exchange
* handle reconnections
* call `handle_message` callback on every message delivery
* call `handle_error` callback whenever `handle_message` fails to process or times out
"""

use GenServer
use AMQP

require Logger
alias GenRMQ.Message
alias GenRMQ.{Message, MessageTask}
alias GenRMQ.Consumer.QueueConfiguration

##############################################################################
Expand Down Expand Up @@ -186,6 +187,28 @@ defmodule GenRMQ.Consumer do
"""
@callback handle_message(message :: %GenRMQ.Message{}) :: :ok

@doc """
Invoked when an error or timeout is encountered while executing `handle_message` callback

`message` - `GenRMQ.Message` struct
`reason` - atom denoting the type of error

## Examples:
To reject the message message that caused the Task to fail you can do something like so:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double message

```
def handle_error(message, reason) do
# Do something with message and reject it
Logger.warn("Failed to process message: #\{inspect(message)}")

GenRMQ.Consumer.reject(message)
end

The `reason` argument will either be `:timeout` or `:error` if the message processing task
timed out or encountered and error respectively.
```
"""
@callback handle_error(message :: %GenRMQ.Message{}, reason :: atom()) :: :ok

##############################################################################
# GenRMQ.Consumer API
##############################################################################
Expand Down Expand Up @@ -309,12 +332,15 @@ defmodule GenRMQ.Consumer do
%{module: module, config: config, running_tasks: running_tasks} = state
) do
case Map.get(running_tasks, ref) do
{%Task{}, timeout_reference} ->
Logger.info("[#{module}]: Task failed to handle message. Reason: #{inspect(reason)}")
%MessageTask{exit_status: exit_status, message: message, timeout_reference: timeout_reference} ->
exit_status = exit_status || :error
Logger.info("[#{module}]: Task failed to handle message. Reason: #{inspect(exit_status)}")

# Cancel timeout timer, emit telemetry event, and invoke user's `handle_error` callback
Process.cancel_timer(timeout_reference)
updated_state = %{state | running_tasks: Map.delete(running_tasks, ref)}
emit_task_error_event(module, reason)
emit_task_error_event(module, exit_status)
apply(module, :handle_error, [message, exit_status])

{:noreply, updated_state}

Expand All @@ -332,13 +358,13 @@ defmodule GenRMQ.Consumer do
@doc false
@impl GenServer
def handle_info({ref, _task_result}, %{running_tasks: running_tasks} = state) when is_reference(ref) do
# Task completed sucessfully, update the running task map and state
# Task completed successfully, update the running task map and state
Process.demonitor(ref, [:flush])

updated_state =
case Map.get(running_tasks, ref) do
{%Task{}, timeout_reference} ->
Process.cancel_timer(timeout_reference)
%MessageTask{} = message_task ->
Process.cancel_timer(message_task.timeout_reference)
%{state | running_tasks: Map.delete(running_tasks, ref)}

_ ->
Expand All @@ -353,10 +379,13 @@ defmodule GenRMQ.Consumer do
def handle_info({:kill, task_reference}, %{running_tasks: running_tasks} = state) when is_reference(task_reference) do
# The task has timed out, kill the Task process which will trigger a :DOWN event that
# is handled by a previous `handle_info/2` callback
{%Task{pid: pid}, _timeout_reference} = Map.get(running_tasks, task_reference)
message_task = Map.get(running_tasks, task_reference)
%MessageTask{task: %Task{pid: pid}} = message_task
updated_state = put_in(state, [:running_tasks, task_reference], %{message_task | exit_status: :timeout})

Process.exit(pid, :kill)

{:noreply, state}
{:noreply, updated_state}
end

@doc false
Expand Down Expand Up @@ -393,11 +422,14 @@ defmodule GenRMQ.Consumer do
Logger.debug("[#{module}]: Redelivered payload for message. Tag: #{tag}, payload: #{payload}")
end

message = Message.create(attributes, payload, state)

updated_state =
case handle_message(payload, attributes, state) do
case handle_message(message, state) do
%Task{ref: task_reference} = task ->
timeout_reference = Process.send_after(self(), {:kill, task_reference}, handle_message_timeout)
%{state | running_tasks: Map.put(running_tasks, task_reference, {task, timeout_reference})}
message_task = MessageTask.create(task, timeout_reference, message)
%{state | running_tasks: Map.put(running_tasks, task_reference, message_task)}

_ ->
state
Expand Down Expand Up @@ -451,9 +483,9 @@ defmodule GenRMQ.Consumer do
# their individual timeout timers
running_tasks
|> Map.values()
|> Enum.map(fn {%Task{} = task, timeout_reference} ->
Process.cancel_timer(timeout_reference)
task
|> Enum.map(fn %MessageTask{} = message_task ->
Process.cancel_timer(message_task.timeout_reference)
message_task.task
end)
|> Task.yield_many(terminate_timeout)
end
Expand All @@ -466,13 +498,12 @@ defmodule GenRMQ.Consumer do
|> Keyword.put(:connection, Keyword.get(config, :connection, config[:uri]))
end

defp handle_message(payload, attributes, %{module: module, task_supervisor: task_supervisor_pid} = state)
defp handle_message(message, %{module: module, task_supervisor: task_supervisor_pid})
when is_pid(task_supervisor_pid) do
Task.Supervisor.async_nolink(
task_supervisor_pid,
fn ->
start_time = System.monotonic_time()
message = Message.create(attributes, payload, state)

emit_message_start_event(start_time, message, module)
result = apply(module, :handle_message, [message])
Expand All @@ -484,9 +515,8 @@ defmodule GenRMQ.Consumer do
)
end

defp handle_message(payload, attributes, %{module: module} = state) do
defp handle_message(message, %{module: module}) do
start_time = System.monotonic_time()
message = Message.create(attributes, payload, state)

emit_message_start_event(start_time, message, module)
result = apply(module, :handle_message, [message])
Expand Down
25 changes: 25 additions & 0 deletions lib/message_task.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
defmodule GenRMQ.MessageTask do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea to wrap it like this 👍

@moduledoc """
Struct wrapping details of a Task that is executing the configured
`handle_message` callback

Defines:
* `:task` - the Task struct executing the user's `handle_message` callback
* `:timeout_reference` - the reference to the timeout timer
* `:message` - the GenRMQ.Message struct that is being processed
* `:exit_status` - the exist status of the Task
"""

@enforce_keys [:task, :timeout_reference, :message, :exit_status]
defstruct [:task, :timeout_reference, :message, :exit_status]

@doc false
def create(task, timeout_reference, message) do
%__MODULE__{
task: task,
timeout_reference: timeout_reference,
message: message,
exit_status: nil
}
end
end
19 changes: 13 additions & 6 deletions test/gen_rmq_consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ defmodule GenRMQ.ConsumerTest do
setup :attach_telemetry_handlers

setup do
Agent.start_link(fn -> MapSet.new() end, name: ErrorInConsumer)
ErrorInConsumer.start_link(self())
with_test_consumer(ErrorInConsumer)
end

test "should invoke the consumer's handle_info callback if error exists",
test "should invoke the user's handle_error callback if an error occurs",
%{consumer: consumer_pid, state: state} = context do
clear_mailbox()

Expand All @@ -119,10 +119,11 @@ defmodule GenRMQ.ConsumerTest do
assert queue_count(context[:rabbit_conn], state[:config][:queue].name) == {:ok, 0}
end)

assert_receive {:telemetry_event, [:gen_rmq, :consumer, :task, :error], %{time: _}, %{reason: _, module: _}}
assert_receive {:telemetry_event, [:gen_rmq, :consumer, :task, :error], %{time: _}, %{reason: :error, module: _}}
assert_receive {:task_error, :error}
end

test "should not invoke the consumer's handle_info callback if error does not exist",
test "should not invoke the user's handle_error callback if an error does not occur",
%{consumer: consumer_pid, state: state} = context do
clear_mailbox()

Expand All @@ -137,14 +138,15 @@ defmodule GenRMQ.ConsumerTest do
end)

refute_receive {:telemetry_event, [:gen_rmq, :consumer, :task, :error], %{time: _}, %{reason: _, module: _}}
refute_receive {:task_error, :error}
end
end

describe "TestConsumer.SlowConsumer" do
setup :attach_telemetry_handlers

setup do
Agent.start_link(fn -> MapSet.new() end, name: SlowConsumer)
SlowConsumer.start_link(self())
with_test_consumer(SlowConsumer)
end

Expand Down Expand Up @@ -173,6 +175,8 @@ defmodule GenRMQ.ConsumerTest do
{:telemetry_event, [:gen_rmq, :consumer, :message, :stop], %{time: _, duration: _}, %{message: _, module: _}},
1_000
)

refute_receive {:task_error, :error}
end

test "should error out the task if it takes too long",
Expand All @@ -194,7 +198,10 @@ defmodule GenRMQ.ConsumerTest do
2_000
)

assert_receive {:telemetry_event, [:gen_rmq, :consumer, :task, :error], %{time: _}, %{reason: :killed, module: _}}
assert_receive {:telemetry_event, [:gen_rmq, :consumer, :task, :error], %{time: _},
%{reason: :timeout, module: _}}

assert_receive {:task_error, :timeout}
end
end

Expand Down
Loading