Skip to content

Commit 9721487

Browse files
ansdgomoripeti
andcommitted
Emit histogram metric for received message sizes per protocol (rabbitmq#12342)
* Add global histogram metrics for received message sizes per-protocol fixup: add new files to bazel fixup: expose message_size_bytes as prometheus classic histogram type `rabbit_msg_size_metrics` does not use `seshat` any more, but `counters` directly. fixup: add msg_size_metrics unit test * Improve message size histogram 1. Avoid unnecessary time series emitted for stream protocol The stream protocol cannot observe message sizes. This commit ensures that the following time series are omitted: ``` rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="64"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="256"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="1024"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="4096"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="16384"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="65536"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="262144"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="1048576"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="4194304"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="16777216"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="67108864"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="268435456"} 0 rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="+Inf"} 0 rabbitmq_global_message_size_bytes_count{protocol="stream"} 0 rabbitmq_global_message_size_bytes_sum{protocol="stream"} 0 ``` This reduces the number of time series by 15. 2. Further reduce the number of time series by reducing the number of buckets. Instead of 13 bucktes, emit only 9 buckets. Buckets are not free, each is an extra time series stored. Prior to this commit: ``` curl -s -u guest:guest localhost:15692/metrics | ag message_size | wc -l 92 ``` After this commit: ``` curl -s -u guest:guest localhost:15692/metrics | ag message_size | wc -l 57 ``` 3. The emitted metric should be called `rabbitmq_message_size_bytes_bucket` instead of `rabbitmq_global_message_size_bytes_bucket`. The latter is poor naming. There is no need to use `global` in the metric name given that this metric doesn't exist in the old flawed aggregated metrics. 4. This commit simplies module `rabbit_global_counters`. 5. Avoid garbage collecting the 10-elements list of buckets per message being received. --------- Co-authored-by: Péter Gömöri <peter@84codes.com>
1 parent 118a614 commit 9721487

16 files changed

+509
-25
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,13 @@ rabbitmq_integration_suite(
495495
],
496496
)
497497

498+
rabbitmq_integration_suite(
499+
name = "msg_size_metrics_SUITE",
500+
runtime_deps = [
501+
"//deps/rabbitmq_amqp_client:erlang_app",
502+
],
503+
)
504+
498505
rabbitmq_integration_suite(
499506
name = "list_consumers_sanity_check_SUITE",
500507
size = "medium",
@@ -1050,6 +1057,11 @@ rabbitmq_integration_suite(
10501057
size = "medium",
10511058
)
10521059

1060+
rabbitmq_suite(
1061+
name = "unit_msg_size_metrics_SUITE",
1062+
size = "small",
1063+
)
1064+
10531065
rabbitmq_suite(
10541066
name = "unit_operator_policy_SUITE",
10551067
size = "small",

deps/rabbit/app.bzl

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ def all_beam_files(name = "all_beam_files"):
170170
"src/rabbit_mirror_queue_sync.erl",
171171
"src/rabbit_mnesia.erl",
172172
"src/rabbit_msg_record.erl",
173+
"src/rabbit_msg_size_metrics.erl",
173174
"src/rabbit_msg_store.erl",
174175
"src/rabbit_msg_store_ets_index.erl",
175176
"src/rabbit_msg_store_gc.erl",
@@ -433,6 +434,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
433434
"src/rabbit_mirror_queue_sync.erl",
434435
"src/rabbit_mnesia.erl",
435436
"src/rabbit_msg_record.erl",
437+
"src/rabbit_msg_size_metrics.erl",
436438
"src/rabbit_msg_store.erl",
437439
"src/rabbit_msg_store_ets_index.erl",
438440
"src/rabbit_msg_store_gc.erl",
@@ -714,6 +716,7 @@ def all_srcs(name = "all_srcs"):
714716
"src/rabbit_mirror_queue_sync.erl",
715717
"src/rabbit_mnesia.erl",
716718
"src/rabbit_msg_record.erl",
719+
"src/rabbit_msg_size_metrics.erl",
717720
"src/rabbit_msg_store.erl",
718721
"src/rabbit_msg_store_ets_index.erl",
719722
"src/rabbit_msg_store_gc.erl",
@@ -1830,6 +1833,14 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
18301833
erlc_opts = "//:test_erlc_opts",
18311834
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
18321835
)
1836+
erlang_bytecode(
1837+
name = "unit_msg_size_metrics_SUITE_beam_files",
1838+
testonly = True,
1839+
srcs = ["test/unit_msg_size_metrics_SUITE.erl"],
1840+
outs = ["test/unit_msg_size_metrics_SUITE.beam"],
1841+
app_name = "rabbit",
1842+
erlc_opts = "//:test_erlc_opts",
1843+
)
18331844
erlang_bytecode(
18341845
name = "unit_operator_policy_SUITE_beam_files",
18351846
testonly = True,
@@ -2193,3 +2204,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
21932204
erlc_opts = "//:test_erlc_opts",
21942205
deps = ["//deps/amqp_client:erlang_app"],
21952206
)
2207+
erlang_bytecode(
2208+
name = "msg_size_metrics_SUITE_beam_files",
2209+
testonly = True,
2210+
srcs = ["test/msg_size_metrics_SUITE.erl"],
2211+
outs = ["test/msg_size_metrics_SUITE.beam"],
2212+
app_name = "rabbit",
2213+
erlc_opts = "//:test_erlc_opts",
2214+
deps = ["//deps/amqp_client:erlang_app"],
2215+
)

deps/rabbit/src/rabbit_channel.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1063,7 +1063,8 @@ check_msg_size(Content, MaxMessageSize, GCThreshold) ->
10631063
end,
10641064
rabbit_misc:precondition_failed(ErrorMessage,
10651065
[Size, MaxMessageSize]);
1066-
_ -> ok
1066+
_ ->
1067+
rabbit_msg_size_metrics:observe(amqp091, Size)
10671068
end.
10681069

10691070
qbin_to_resource(QueueNameBin, VHostPath) ->

deps/rabbit/src/rabbit_global_counters.erl

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
boot_step/0,
1414
init/1,
1515
init/2,
16-
overview/0,
1716
prometheus_format/0,
1817
increase_protocol_counter/3,
1918
messages_received/2,
@@ -38,6 +37,10 @@
3837
messages_dead_lettered_confirmed/3
3938
]).
4039

40+
-ifdef(TEST).
41+
-export([overview/0]).
42+
-endif.
43+
4144
%% PROTOCOL COUNTERS:
4245
-define(MESSAGES_RECEIVED, 1).
4346
-define(MESSAGES_RECEIVED_CONFIRM, 2).
@@ -133,6 +136,7 @@
133136
boot_step() ->
134137
%% Protocol counters
135138
init([{protocol, amqp091}]),
139+
rabbit_msg_size_metrics:init(amqp091),
136140

137141
%% Protocol & Queue Type counters
138142
init([{protocol, amqp091}, {queue_type, rabbit_classic_queue}]),
@@ -191,8 +195,10 @@ init(Labels = [{queue_type, QueueType}, {dead_letter_strategy, DLS}], DeadLetter
191195
Counters = seshat:new(?MODULE, Labels, DeadLetterCounters),
192196
persistent_term:put({?MODULE, QueueType, DLS}, Counters).
193197

198+
-ifdef(TEST).
194199
overview() ->
195200
seshat:overview(?MODULE).
201+
-endif.
196202

197203
prometheus_format() ->
198204
seshat:format(?MODULE).
@@ -246,13 +252,13 @@ publisher_created(Protocol) ->
246252
counters:add(fetch(Protocol), ?PUBLISHERS, 1).
247253

248254
publisher_deleted(Protocol) ->
249-
counters:add(fetch(Protocol), ?PUBLISHERS, -1).
255+
counters:sub(fetch(Protocol), ?PUBLISHERS, 1).
250256

251257
consumer_created(Protocol) ->
252258
counters:add(fetch(Protocol), ?CONSUMERS, 1).
253259

254260
consumer_deleted(Protocol) ->
255-
counters:add(fetch(Protocol), ?CONSUMERS, -1).
261+
counters:sub(fetch(Protocol), ?CONSUMERS, 1).
256262

257263
messages_dead_lettered(Reason, QueueType, DeadLetterStrategy, Num) ->
258264
Index = case Reason of
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
%% This module tracks received message size distribution as histogram.
9+
%% (A histogram is represented by a set of counters, one for each bucket.)
10+
-module(rabbit_msg_size_metrics).
11+
12+
-export([init/1,
13+
observe/2,
14+
prometheus_format/0]).
15+
16+
%% Integration tests.
17+
-export([raw_buckets/1,
18+
diff_raw_buckets/2]).
19+
20+
-ifdef(TEST).
21+
-export([cleanup/1]).
22+
-endif.
23+
24+
-define(BUCKET_1, 100).
25+
-define(BUCKET_2, 1_000).
26+
-define(BUCKET_3, 10_000).
27+
-define(BUCKET_4, 100_000).
28+
-define(BUCKET_5, 1_000_000).
29+
-define(BUCKET_6, 10_000_000).
30+
%% rabbit.max_message_size up to RabbitMQ 3.13 was 128 MiB.
31+
%% rabbit.max_message_size since RabbitMQ 4.0 is 16 MiB.
32+
%% To help finding an appropriate rabbit.max_message_size we also add a bucket for 50 MB.
33+
-define(BUCKET_7, 50_000_000).
34+
-define(BUCKET_8, 100_000_000).
35+
%% 'infinity' means practically 512 MiB as hard limited in
36+
%% https://github.com/rabbitmq/rabbitmq-server/blob/v4.0.2/deps/rabbit_common/include/rabbit.hrl#L254-L257
37+
-define(BUCKET_9, 'infinity').
38+
39+
-define(MSG_SIZE_BUCKETS,
40+
[{1, ?BUCKET_1},
41+
{2, ?BUCKET_2},
42+
{3, ?BUCKET_3},
43+
{4, ?BUCKET_4},
44+
{5, ?BUCKET_5},
45+
{6, ?BUCKET_6},
46+
{7, ?BUCKET_7},
47+
{8, ?BUCKET_8},
48+
{9, ?BUCKET_9}]).
49+
50+
-define(POS_MSG_SIZE_SUM, 10).
51+
52+
-type raw_buckets() :: [{BucketUpperBound :: non_neg_integer(),
53+
NumObservations :: non_neg_integer()}].
54+
55+
-spec init(atom()) -> ok.
56+
init(Protocol) ->
57+
Size = ?POS_MSG_SIZE_SUM,
58+
Counters = counters:new(Size, [write_concurrency]),
59+
put_counters(Protocol, Counters).
60+
61+
-spec observe(atom(), non_neg_integer()) -> ok.
62+
observe(Protocol, MessageSize) ->
63+
BucketPos = find_bucket_pos(MessageSize),
64+
Counters = get_counters(Protocol),
65+
counters:add(Counters, BucketPos, 1),
66+
counters:add(Counters, ?POS_MSG_SIZE_SUM, MessageSize).
67+
68+
-spec prometheus_format() -> #{atom() => map()}.
69+
prometheus_format() ->
70+
Values = [prometheus_values(Counters) || Counters <- get_labels_counters()],
71+
#{message_size_bytes => #{type => histogram,
72+
help => "Size of messages received from publishers",
73+
values => Values}}.
74+
75+
find_bucket_pos(Size) when Size =< ?BUCKET_1 -> 1;
76+
find_bucket_pos(Size) when Size =< ?BUCKET_2 -> 2;
77+
find_bucket_pos(Size) when Size =< ?BUCKET_3 -> 3;
78+
find_bucket_pos(Size) when Size =< ?BUCKET_4 -> 4;
79+
find_bucket_pos(Size) when Size =< ?BUCKET_5 -> 5;
80+
find_bucket_pos(Size) when Size =< ?BUCKET_6 -> 6;
81+
find_bucket_pos(Size) when Size =< ?BUCKET_7 -> 7;
82+
find_bucket_pos(Size) when Size =< ?BUCKET_8 -> 8;
83+
find_bucket_pos(_Size) -> 9.
84+
85+
raw_buckets(Protocol)
86+
when is_atom(Protocol) ->
87+
Counters = get_counters(Protocol),
88+
raw_buckets(Counters);
89+
raw_buckets(Counters) ->
90+
[{UpperBound, counters:get(Counters, Pos)}
91+
|| {Pos, UpperBound} <- ?MSG_SIZE_BUCKETS].
92+
93+
-spec diff_raw_buckets(raw_buckets(), raw_buckets()) -> raw_buckets().
94+
diff_raw_buckets(After, Before) ->
95+
diff_raw_buckets(After, Before, []).
96+
97+
diff_raw_buckets([], [], Acc) ->
98+
lists:reverse(Acc);
99+
diff_raw_buckets([{UpperBound, CounterAfter} | After],
100+
[{UpperBound, CounterBefore} | Before],
101+
Acc) ->
102+
case CounterAfter - CounterBefore of
103+
0 ->
104+
diff_raw_buckets(After, Before, Acc);
105+
Diff ->
106+
diff_raw_buckets(After, Before, [{UpperBound, Diff} | Acc])
107+
end.
108+
109+
%% "If you have looked at a /metrics for a histogram, you probably noticed that the buckets
110+
%% aren’t just a count of events that fall into them. The buckets also include a count of
111+
%% events in all the smaller buckets, all the way up to the +Inf, bucket which is the total
112+
%% number of events. This is known as a cumulative histogram, and why the bucket label
113+
%% is called le, standing for less than or equal to.
114+
%% This is in addition to buckets being counters, so Prometheus histograms are cumula‐
115+
%% tive in two different ways."
116+
%% [Prometheus: Up & Running]
117+
prometheus_values({Labels, Counters}) ->
118+
{Buckets, Count} = lists:mapfoldl(
119+
fun({UpperBound, NumObservations}, Acc0) ->
120+
Acc = Acc0 + NumObservations,
121+
{{UpperBound, Acc}, Acc}
122+
end, 0, raw_buckets(Counters)),
123+
Sum = counters:get(Counters, ?POS_MSG_SIZE_SUM),
124+
{Labels, Buckets, Count, Sum}.
125+
126+
put_counters(Protocol, Counters) ->
127+
persistent_term:put({?MODULE, Protocol}, Counters).
128+
129+
get_counters(Protocol) ->
130+
persistent_term:get({?MODULE, Protocol}).
131+
132+
get_labels_counters() ->
133+
[{[{protocol, Protocol}], Counters}
134+
|| {{?MODULE, Protocol}, Counters} <- persistent_term:get()].
135+
136+
-ifdef(TEST).
137+
%% "Counters are not tied to the current process and are automatically
138+
%% garbage collected when they are no longer referenced."
139+
-spec cleanup(atom()) -> ok.
140+
cleanup(Protocol) ->
141+
persistent_term:erase({?MODULE, Protocol}),
142+
ok.
143+
-endif.

0 commit comments

Comments
 (0)