Skip to content

Commit

Permalink
Improve message size histogram
Browse files Browse the repository at this point in the history
1.
Avoid unnecessary time series emitted for stream protocol
The stream protocol cannot observe message sizes.
This commit ensures that the following time series are omitted:
```
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="64"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="256"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="1024"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="4096"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="16384"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="65536"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="262144"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="1048576"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="4194304"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="16777216"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="67108864"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="268435456"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="+Inf"} 0
rabbitmq_global_message_size_bytes_count{protocol="stream"} 0
rabbitmq_global_message_size_bytes_sum{protocol="stream"} 0
```

This reduces the number of time series by 15.

2.
Further reduce the number of time series by reducing the number of
buckets. Instead of 13 bucktes, emit only 9 buckets. Buckets are not
free, each is an extra time series stored.

Prior to this commit:
```
curl -s -u guest:guest localhost:15692/metrics | ag message_size | wc -l
      92
```

After this commit:
```
curl -s -u guest:guest localhost:15692/metrics | ag message_size | wc -l
      57
```

3.
The emitted metric should be called
`rabbitmq_message_size_bytes_bucket` instead of `rabbitmq_global_message_size_bytes_bucket`.
The latter is poor naming. There is no need to use `global` in
the metric name given that this metric doesn't exist in the old flawed
aggregated metrics.

4.
This commit simplies module `rabbit_global_counters`.

5.
Avoid garbage collecting the 10-elements list of buckets per message
being received.
  • Loading branch information
ansd committed Sep 23, 2024
1 parent c222f58 commit f775310
Show file tree
Hide file tree
Showing 20 changed files with 457 additions and 390 deletions.
6 changes: 4 additions & 2 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,10 @@ rabbitmq_integration_suite(
)

rabbitmq_integration_suite(
name = "global_metrics_SUITE",
size = "small",
name = "msg_size_metrics_SUITE",
runtime_deps = [
"//deps/rabbitmq_amqp_client:erlang_app",
],
)

rabbitmq_integration_suite(
Expand Down
15 changes: 12 additions & 3 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_metrics.erl",
"src/rabbit_mirror_queue_misc.erl",
"src/rabbit_mnesia.erl",
"src/rabbit_msg_size_metrics.erl",
"src/rabbit_msg_size_metrics.erl",
"src/rabbit_msg_store.erl",
"src/rabbit_msg_store_gc.erl",
"src/rabbit_networking.erl",
Expand Down Expand Up @@ -426,7 +426,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_metrics.erl",
"src/rabbit_mirror_queue_misc.erl",
"src/rabbit_mnesia.erl",
"src/rabbit_msg_size_metrics.erl",
"src/rabbit_msg_size_metrics.erl",
"src/rabbit_msg_store.erl",
"src/rabbit_msg_store_gc.erl",
"src/rabbit_networking.erl",
Expand Down Expand Up @@ -705,6 +705,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_metrics.erl",
"src/rabbit_mirror_queue_misc.erl",
"src/rabbit_mnesia.erl",
"src/rabbit_msg_size_metrics.erl",
"src/rabbit_msg_store.erl",
"src/rabbit_msg_store_gc.erl",
"src/rabbit_networking.erl",
Expand Down Expand Up @@ -1723,7 +1724,6 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
outs = ["test/unit_msg_size_metrics_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = [],
)
erlang_bytecode(
name = "unit_operator_policy_SUITE_beam_files",
Expand Down Expand Up @@ -2194,3 +2194,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
)
erlang_bytecode(
name = "msg_size_metrics_SUITE_beam_files",
testonly = True,
srcs = ["test/msg_size_metrics_SUITE.erl"],
outs = ["test/msg_size_metrics_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
14 changes: 8 additions & 6 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2336,8 +2336,9 @@ incoming_link_transfer(
{MsgBin0, FirstDeliveryId, FirstSettled}
end,
validate_transfer_rcv_settle_mode(RcvSettleMode, Settled),
validate_message_size(PayloadBin, MaxMessageSize),
rabbit_global_counters:message_size(?PROTOCOL, byte_size(PayloadBin)),
PayloadSize = iolist_size(PayloadBin),
validate_message_size(PayloadSize, MaxMessageSize),
rabbit_msg_size_metrics:observe(?PROTOCOL, PayloadSize),

Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
Expand Down Expand Up @@ -3067,9 +3068,8 @@ validate_transfer_rcv_settle_mode(_, _) ->

validate_message_size(_, unlimited) ->
ok;
validate_message_size(Message, MaxMsgSize)
when is_integer(MaxMsgSize) ->
MsgSize = iolist_size(Message),
validate_message_size(MsgSize, MaxMsgSize)
when is_integer(MsgSize) ->
case MsgSize =< MaxMsgSize of
true ->
ok;
Expand All @@ -3083,7 +3083,9 @@ validate_message_size(Message, MaxMsgSize)
?V_1_0_LINK_ERROR_MESSAGE_SIZE_EXCEEDED,
"message size (~b bytes) > maximum message size (~b bytes)",
[MsgSize, MaxMsgSize])
end.
end;
validate_message_size(Msg, MaxMsgSize) ->
validate_message_size(iolist_size(Msg), MaxMsgSize).

-spec ensure_terminus(source | target,
term(),
Expand Down
3 changes: 1 addition & 2 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -985,8 +985,7 @@ check_msg_size(Content, GCThreshold) ->
Size = rabbit_basic:maybe_gc_large_msg(Content, GCThreshold),
case Size =< MaxMessageSize of
true ->
rabbit_global_counters:message_size(amqp091, Size),
ok;
rabbit_msg_size_metrics:observe(amqp091, Size);
false ->
Fmt = case MaxMessageSize of
?MAX_MSG_SIZE ->
Expand Down
35 changes: 17 additions & 18 deletions deps/rabbit/src/rabbit_global_counters.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
boot_step/0,
init/1,
init/2,
overview/0,
prometheus_format/0,
increase_protocol_counter/3,
messages_received/2,
Expand All @@ -34,11 +33,14 @@
publisher_deleted/1,
consumer_created/1,
consumer_deleted/1,
message_size/2,
messages_dead_lettered/4,
messages_dead_lettered_confirmed/3
]).

-ifdef(TEST).
-export([overview/0]).
-endif.

%% PROTOCOL COUNTERS:
-define(MESSAGES_RECEIVED, 1).
-define(MESSAGES_RECEIVED_CONFIRM, 2).
Expand Down Expand Up @@ -133,12 +135,14 @@
boot_step() ->
[begin
%% Protocol counters
init([{protocol, Proto}]),
Protocol = {protocol, Proto},
init([Protocol]),
rabbit_msg_size_metrics:init(Proto),

%% Protocol & Queue Type counters
init([{protocol, Proto}, {queue_type, rabbit_classic_queue}]),
init([{protocol, Proto}, {queue_type, rabbit_quorum_queue}]),
init([{protocol, Proto}, {queue_type, rabbit_stream_queue}])
init([Protocol, {queue_type, rabbit_classic_queue}]),
init([Protocol, {queue_type, rabbit_quorum_queue}]),
init([Protocol, {queue_type, rabbit_stream_queue}])
end || Proto <- [amqp091, amqp10]],

%% Dead Letter counters
Expand Down Expand Up @@ -187,21 +191,19 @@ init(Labels = [{protocol, Protocol}, {queue_type, QueueType}], Extra) ->
init(Labels = [{protocol, Protocol}], Extra) ->
_ = seshat:new_group(?MODULE),
Counters = seshat:new(?MODULE, Labels, ?PROTOCOL_COUNTERS ++ Extra),
persistent_term:put({?MODULE, Protocol}, Counters),
rabbit_msg_size_metrics:init(Labels);
persistent_term:put({?MODULE, Protocol}, Counters);
init(Labels = [{queue_type, QueueType}, {dead_letter_strategy, DLS}], DeadLetterCounters) ->
_ = seshat:new_group(?MODULE),
Counters = seshat:new(?MODULE, Labels, DeadLetterCounters),
persistent_term:put({?MODULE, QueueType, DLS}, Counters).

-ifdef(TEST).
overview() ->
maps:merge_with(
fun(_Key, Value1, Value2) -> maps:merge(Value1, Value2) end,
rabbit_msg_size_metrics:overview(),
seshat:overview(?MODULE)).
seshat:overview(?MODULE).
-endif.

prometheus_format() ->
maps:merge(seshat:format(?MODULE), rabbit_msg_size_metrics:prometheus_format()).
seshat:format(?MODULE).

increase_protocol_counter(Protocol, Counter, Num) ->
counters:add(fetch(Protocol), Counter, Num).
Expand Down Expand Up @@ -252,16 +254,13 @@ publisher_created(Protocol) ->
counters:add(fetch(Protocol), ?PUBLISHERS, 1).

publisher_deleted(Protocol) ->
counters:add(fetch(Protocol), ?PUBLISHERS, -1).
counters:sub(fetch(Protocol), ?PUBLISHERS, 1).

consumer_created(Protocol) ->
counters:add(fetch(Protocol), ?CONSUMERS, 1).

consumer_deleted(Protocol) ->
counters:add(fetch(Protocol), ?CONSUMERS, -1).

message_size(Protocol, MessageSize) ->
rabbit_msg_size_metrics:update(Protocol, MessageSize).
counters:sub(fetch(Protocol), ?CONSUMERS, 1).

messages_dead_lettered(Reason, QueueType, DeadLetterStrategy, Num) ->
Index = case Reason of
Expand Down
Loading

0 comments on commit f775310

Please sign in to comment.