Skip to content
This repository has been archived by the owner on Nov 27, 2023. It is now read-only.

Updated telemetry events #189

Merged
merged 8 commits into from
Jun 8, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ GenRMQ provides the following functionality:
- `GenRMQ.Consumer` - a behaviour for implementing RabbitMQ consumers ([example][example_consumer])
- `GenRMQ.Publisher` - a behaviour for implementing RabbitMQ publishers ([example][example_publisher])
- `GenRMQ.Processor` - a behaviour for implementing RabbitMQ message processors (this is useful to separate out business logic from your consumer) ([example][example_processor])
- `GenRMQ.Consumer.Telemetry` - telemetry events emitted by a GenRMQ consumer
- `GenRMQ.Publisher.Telemetry` - telemetry events emitted by a GenRMQ publisher
- `GenRMQ.RabbitCase` - test utilities for RabbitMQ ([example][example_rabbit_case])

## Installation
Expand Down Expand Up @@ -117,11 +119,6 @@ GenRMQ.Publisher.publish(Publisher, Jason.encode!(%{msg: "msg"}))
- [Consumer without deadletter configuration][without_deadletter_configuration]
- [Consumer with quorum queues][with_quorum_queue_type]

### Metrics
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not convinced with removing this documentation. We should still treat these modules as internal ones. Users should not interact with them directly meaning that they might not come across their documentation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I added it back in the README and have the links going to the modules. That work?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akoutmos yes, sounds good 👍


- [Consumer Telemetry events][consumer_telemetry_events]
- [Publisher Telemetry events][publisher_telemetry_events]

## Running Tests

You need [docker-compose][docker_compose] installed.
Expand Down Expand Up @@ -181,8 +178,6 @@ Copyright (c) 2018 - 2020 Meltwater Inc. [underthehood.meltwater.com][undertheho
[guide_consumer_with_custom_queue_configuration]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/consumer/with_custom_queue_configuration.md
[without_deadletter_configuration]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/consumer/without_deadletter_configuration.md
[with_quorum_queue_type]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/consumer/with_quorum_queue_type.md
[consumer_telemetry_events]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/consumer/telemetry_events.md
[publisher_telemetry_events]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/publisher/telemetry_events.md
[trusted_commiters]: https://github.com/meltwater/gen_rmq/blob/master/TRUSTED-COMMITTERS.md
[code_owners]: https://github.com/meltwater/gen_rmq/blob/master/.github/CODEOWNERS
[license]: https://github.com/meltwater/gen_rmq/blob/master/LICENSE
50 changes: 0 additions & 50 deletions documentation/guides/consumer/telemetry_events.md

This file was deleted.

35 changes: 0 additions & 35 deletions documentation/guides/publisher/telemetry_events.md

This file was deleted.

126 changes: 20 additions & 106 deletions lib/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,14 @@ defmodule GenRMQ.Consumer do
use AMQP

require Logger
alias GenRMQ.{Message, MessageTask}
alias GenRMQ.Consumer.QueueConfiguration

alias GenRMQ.Consumer.{
MessageTask,
QueueConfiguration,
Telemetry
}

alias GenRMQ.Message

##############################################################################
# GenRMQ.Consumer callbacks
Expand Down Expand Up @@ -278,7 +284,7 @@ defmodule GenRMQ.Consumer do
"""
@spec ack(message :: %GenRMQ.Message{}) :: :ok
def ack(%Message{state: %{in: channel}, attributes: %{delivery_tag: tag}} = message) do
emit_message_ack_event(message)
Telemetry.emit_message_ack_event(message)

Basic.ack(channel, tag)
end
Expand All @@ -292,7 +298,7 @@ defmodule GenRMQ.Consumer do
"""
@spec reject(message :: %GenRMQ.Message{}, requeue :: boolean) :: :ok
def reject(%Message{state: %{in: channel}, attributes: %{delivery_tag: tag}} = message, requeue \\ false) do
emit_message_reject_event(message, requeue)
Telemetry.emit_message_reject_event(message, requeue)

Basic.reject(channel, tag, requeue: requeue)
end
Expand Down Expand Up @@ -353,15 +359,15 @@ defmodule GenRMQ.Consumer do
# Cancel timeout timer, emit telemetry event, and invoke user's `handle_error` callback
Process.cancel_timer(timeout_reference)
updated_state = %{state | running_tasks: Map.delete(running_tasks, ref)}
emit_message_error_event(module, reason, message, start_time)
Telemetry.emit_message_exception_event(module, message, start_time, reason)
apply(module, :handle_error, [message, reason])

{:noreply, updated_state}

_ ->
Logger.info("[#{module}]: RabbitMQ connection is down! Reason: #{inspect(reason)}")

emit_connection_down_event(module, reason)
Telemetry.emit_connection_down_event(module, reason)

config
|> Keyword.get(:reconnect, true)
Expand Down Expand Up @@ -516,9 +522,9 @@ defmodule GenRMQ.Consumer do
fn ->
start_time = System.monotonic_time()

emit_message_start_event(start_time, message, module)
Telemetry.emit_message_start_event(message, module)
result = apply(module, :handle_message, [message])
emit_message_stop_event(start_time, message, module)
Telemetry.emit_message_stop_event(start_time, message, module)

result
end,
Expand All @@ -528,17 +534,17 @@ defmodule GenRMQ.Consumer do

defp handle_message(message, %{module: module}) do
start_time = System.monotonic_time()
emit_message_start_event(start_time, message, module)
Telemetry.emit_message_start_event(message, module)

try do
result = apply(module, :handle_message, [message])
emit_message_stop_event(start_time, message, module)
Telemetry.emit_message_stop_event(start_time, message, module)

result
rescue
reason ->
full_error = {reason, __STACKTRACE__}
emit_message_error_event(module, full_error, message, start_time)
Telemetry.emit_message_exception_event(module, message, start_time, :error, reason, __STACKTRACE__)
apply(module, :handle_error, [message, full_error])
:error
end
Expand Down Expand Up @@ -566,11 +572,11 @@ defmodule GenRMQ.Consumer do
exchange = config[:exchange]
routing_key = config[:routing_key]

emit_connection_start_event(start_time, module, attempt, queue, exchange, routing_key)
Telemetry.emit_connection_start_event(module, attempt, queue, exchange, routing_key)

case Connection.open(config[:connection]) do
{:ok, conn} ->
emit_connection_stop_event(start_time, module, attempt, queue, exchange, routing_key)
Telemetry.emit_connection_stop_event(start_time, module, attempt, queue, exchange, routing_key)
Process.monitor(conn.pid)
Map.put(state, :conn, conn)

Expand All @@ -580,7 +586,7 @@ defmodule GenRMQ.Consumer do
"#{inspect(strip_key(config, :connection))}, reason #{inspect(e)}"
)

emit_connection_error_event(start_time, module, attempt, queue, exchange, routing_key, e)
Telemetry.emit_connection_stop_event(start_time, module, attempt, queue, exchange, routing_key, e)

retry_delay_fn = config[:retry_delay_function] || (&linear_delay/1)
next_attempt = attempt + 1
Expand Down Expand Up @@ -632,98 +638,6 @@ defmodule GenRMQ.Consumer do
GenRMQ.Binding.bind_exchange_and_queue(chan, exchange, name, routing_key)
end

defp emit_message_ack_event(message) do
start_time = System.monotonic_time()
measurements = %{time: start_time}
metadata = %{message: message}

:telemetry.execute([:gen_rmq, :consumer, :message, :ack], measurements, metadata)
end

defp emit_message_reject_event(message, requeue) do
start_time = System.monotonic_time()
measurements = %{time: start_time}
metadata = %{message: message, requeue: requeue}

:telemetry.execute([:gen_rmq, :consumer, :message, :reject], measurements, metadata)
end

defp emit_message_start_event(start_time, message, module) do
measurements = %{time: start_time}
metadata = %{message: message, module: module}

:telemetry.execute([:gen_rmq, :consumer, :message, :start], measurements, metadata)
end

defp emit_message_stop_event(start_time, message, module) do
stop_time = System.monotonic_time()
measurements = %{time: stop_time, duration: stop_time - start_time}
metadata = %{message: message, module: module}

:telemetry.execute([:gen_rmq, :consumer, :message, :stop], measurements, metadata)
end

defp emit_message_error_event(module, reason, message, start_time) do
stop_time = System.monotonic_time()
measurements = %{time: stop_time, duration: stop_time - start_time}
metadata = %{module: module, reason: reason, message: message}

:telemetry.execute([:gen_rmq, :consumer, :message, :error], measurements, metadata)
end

defp emit_connection_down_event(module, reason) do
start_time = System.monotonic_time()
measurements = %{time: start_time}
metadata = %{module: module, reason: reason}

:telemetry.execute([:gen_rmq, :consumer, :connection, :down], measurements, metadata)
end

defp emit_connection_start_event(start_time, module, attempt, queue, exchange, routing_key) do
measurements = %{time: start_time}

metadata = %{
module: module,
attempt: attempt,
queue: queue,
exchange: exchange,
routing_key: routing_key
}

:telemetry.execute([:gen_rmq, :consumer, :connection, :start], measurements, metadata)
end

defp emit_connection_stop_event(start_time, module, attempt, queue, exchange, routing_key) do
stop_time = System.monotonic_time()
measurements = %{time: stop_time, duration: stop_time - start_time}

metadata = %{
module: module,
attempt: attempt,
queue: queue,
exchange: exchange,
routing_key: routing_key
}

:telemetry.execute([:gen_rmq, :consumer, :connection, :stop], measurements, metadata)
end

defp emit_connection_error_event(start_time, module, attempt, queue, exchange, routing_key, error) do
stop_time = System.monotonic_time()
measurements = %{time: stop_time, duration: stop_time - start_time}

metadata = %{
module: module,
attempt: attempt,
queue: queue,
exchange: exchange,
routing_key: routing_key,
error: error
}

:telemetry.execute([:gen_rmq, :consumer, :connection, :error], measurements, metadata)
end

defp strip_key(keyword_list, key) do
keyword_list
|> Keyword.delete(key)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule GenRMQ.MessageTask do
defmodule GenRMQ.Consumer.MessageTask do
@moduledoc """
Struct wrapping details of a Task that is executing the configured
`handle_message` callback
Expand Down
Loading