Skip to content

Commit 8ae2405

Browse files
committed
rabbitmq-diagnostics message_size_stats
This command displays cluster-wide message size statistics. It's less detailed than what can be retrieved from the Prometheus endpoint, but it'll be available to all users, regardless of their monitoring setup, or lack thereof.
1 parent 18e4bf6 commit 8ae2405

File tree

4 files changed

+307
-36
lines changed

4 files changed

+307
-36
lines changed

deps/rabbit/src/rabbit_msg_size_metrics.erl

Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@
1111

1212
-export([init/1,
1313
observe/2,
14-
prometheus_format/0]).
14+
prometheus_format/0,
15+
local_summary/0,
16+
cluster_summary/0,
17+
cluster_summary_for_cli/0 ]).
1518

1619
%% Integration tests.
1720
-export([raw_buckets/1,
@@ -52,6 +55,9 @@
5255
-type raw_buckets() :: [{BucketUpperBound :: non_neg_integer(),
5356
NumObservations :: non_neg_integer()}].
5457

58+
-type summary_entry() :: {{non_neg_integer(), non_neg_integer() | infinity}, {non_neg_integer(), float()}}.
59+
-type summary() :: [summary_entry()].
60+
5561
-spec init(atom()) -> ok.
5662
init(Protocol) ->
5763
Size = ?POS_MSG_SIZE_SUM,
@@ -133,6 +139,109 @@ get_labels_counters() ->
133139
[{[{protocol, Protocol}], Counters}
134140
|| {{?MODULE, Protocol}, Counters} <- persistent_term:get()].
135141

142+
get_protocols() ->
143+
[Protocol
144+
|| {{?MODULE, Protocol}, _} <- persistent_term:get()].
145+
146+
%% Aggregates data for all protocols on the local node
147+
-spec local_summary() -> summary().
148+
local_summary() ->
149+
PerProtocolBuckets = lists:map(fun(Protocol) ->
150+
raw_buckets(Protocol)
151+
end, get_protocols()),
152+
153+
%% Sum buckets for all protocols
154+
Buckets0 = [{?BUCKET_1, 0}, {?BUCKET_2, 0}, {?BUCKET_3, 0}, {?BUCKET_4, 0},
155+
{?BUCKET_5, 0}, {?BUCKET_6, 0}, {?BUCKET_7, 0}, {?BUCKET_8, 0}, {?BUCKET_9, 0}],
156+
Buckets = lists:foldl(fun sum_protocol_buckets/2,
157+
Buckets0,
158+
PerProtocolBuckets),
159+
160+
Total = lists:sum([Count || {_UpperBound, Count} <- Buckets]),
161+
162+
Ranges = lists:map(fun({UpperBound, Count}) ->
163+
Percentage = case Total of
164+
0 -> 0.0;
165+
_ -> (Count / Total) * 100
166+
end,
167+
{bucket_range(UpperBound), {Count, Percentage}}
168+
end, Buckets),
169+
170+
Ranges.
171+
172+
sum_protocol_buckets(ProtocolBuckets, Acc) ->
173+
lists:map(fun({UpperBound, AccCount}) ->
174+
ProtocolCount = proplists:get_value(UpperBound, ProtocolBuckets, 0),
175+
{UpperBound, AccCount + ProtocolCount}
176+
end, Acc).
177+
178+
%% Aggregates sumamries from all nodes
179+
-spec cluster_summary() -> summary().
180+
cluster_summary() ->
181+
RemoteNodes = [Node || Node <- rabbit_nodes:list_running(), Node =/= node()],
182+
RemoteSummaries = [ Summary || {ok, Summary} <- erpc:multicall(RemoteNodes,
183+
?MODULE,
184+
local_summary,
185+
[],
186+
5000)],
187+
lists:foldl(fun merge_summaries/2, local_summary(), RemoteSummaries).
188+
189+
bucket_name({_, ?BUCKET_1}) -> <<"below 100B">>;
190+
bucket_name({_, ?BUCKET_2}) -> <<"between 100B and 1KB">>;
191+
bucket_name({_, ?BUCKET_3}) -> <<"between 1KB and 10KB">>;
192+
bucket_name({_, ?BUCKET_4}) -> <<"between 10KB and 100KB">>;
193+
bucket_name({_, ?BUCKET_5}) -> <<"between 100KB and 1MB">>;
194+
bucket_name({_, ?BUCKET_6}) -> <<"between 1MB and 10MB">>;
195+
bucket_name({_, ?BUCKET_7}) -> <<"between 10MB and 50MB">>;
196+
bucket_name({_, ?BUCKET_8}) -> <<"between 50MB and 100MB">>;
197+
bucket_name({_, ?BUCKET_9}) -> <<"above 100MB">>.
198+
199+
cluster_summary_for_cli() ->
200+
[[{<<"Message Size">>, bucket_name(Range)},
201+
{<<"Count">>, Count},
202+
{<<"Percentage">>, iolist_to_binary(io_lib:format("~.2f", [Percentage]))}]
203+
|| {Range, {Count, Percentage}} <- cluster_summary()].
204+
205+
get_count_for_range(Range, SummaryList) ->
206+
case proplists:get_value(Range, SummaryList) of
207+
{Count, _} -> Count;
208+
undefined -> 0
209+
end.
210+
211+
%% Merges two summary lists by adding their counts and recalculating percentages
212+
merge_summaries(Summary1, Summary2) ->
213+
%% Get all bucket ranges
214+
AllRanges = lists:usort([Range || {Range, _} <- Summary1] ++ [Range || {Range, _} <- Summary2]),
215+
216+
MergedRanges = lists:map(fun(Range) ->
217+
Count1 = get_count_for_range(Range, Summary1),
218+
Count2 = get_count_for_range(Range, Summary2),
219+
NewCount = Count1 + Count2,
220+
{Range, NewCount}
221+
end, AllRanges),
222+
223+
%% Calculate total and percentages
224+
NewTotal = lists:sum([Count || {_, Count} <- MergedRanges]),
225+
FinalRanges = lists:map(fun({Range, Count}) ->
226+
NewPercentage = case NewTotal of
227+
0 -> 0.0;
228+
_ -> (Count / NewTotal) * 100
229+
end,
230+
{Range, {Count, NewPercentage}}
231+
end, MergedRanges),
232+
233+
FinalRanges.
234+
235+
bucket_range(?BUCKET_1) -> {0, 100};
236+
bucket_range(?BUCKET_2) -> {101, 1000};
237+
bucket_range(?BUCKET_3) -> {1001, 10000};
238+
bucket_range(?BUCKET_4) -> {10001, 100000};
239+
bucket_range(?BUCKET_5) -> {100001, 1000000};
240+
bucket_range(?BUCKET_6) -> {1000001, 10000000};
241+
bucket_range(?BUCKET_7) -> {10000001, 50000000};
242+
bucket_range(?BUCKET_8) -> {50000001, 100000000};
243+
bucket_range(?BUCKET_9) -> {100000001, infinity}.
244+
136245
-ifdef(TEST).
137246
%% "Counters are not tied to the current process and are automatically
138247
%% garbage collected when they are no longer referenced."

deps/rabbit/test/msg_size_metrics_SUITE.erl

Lines changed: 86 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,22 @@
1313
-include_lib("amqp_client/include/amqp_client.hrl").
1414

1515
-import(rabbit_ct_broker_helpers,
16-
[rpc/4]).
16+
[rpc/4, rpc/5]).
1717

1818
all() ->
1919
[
20-
{group, tests}
20+
{group, cluster_size_1},
21+
{group, cluster_size_2}
2122
].
2223

2324
groups() ->
2425
[
25-
{tests, [shuffle],
26+
{cluster_size_1, [],
2627
[message_size,
27-
over_max_message_size]}
28+
over_max_message_size]},
29+
{cluster_size_2, [],
30+
[summary]
31+
}
2832
].
2933

3034
%% -------------------------------------------------------------------
@@ -39,7 +43,19 @@ init_per_suite(Config) ->
3943
end_per_suite(Config) ->
4044
rabbit_ct_helpers:run_teardown_steps(Config).
4145

42-
init_per_group(_Group, Config) ->
46+
init_per_group(cluster_size_2, Config0) ->
47+
Config = rabbit_ct_helpers:set_config(Config0, [{rmq_nodes_count, 2}]),
48+
rabbit_ct_helpers:run_steps(
49+
Config,
50+
rabbit_ct_broker_helpers:setup_steps() ++
51+
rabbit_ct_client_helpers:setup_steps()),
52+
case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.2.0') of
53+
ok ->
54+
Config;
55+
{skip, _} = Skip ->
56+
Skip
57+
end;
58+
init_per_group(_Grooup, Config) ->
4359
rabbit_ct_helpers:run_steps(
4460
Config,
4561
rabbit_ct_broker_helpers:setup_steps() ++
@@ -65,32 +81,7 @@ message_size(Config) ->
6581
AmqplBefore = get_msg_size_metrics(amqp091, Config),
6682
AmqpBefore = get_msg_size_metrics(amqp10, Config),
6783

68-
Binary2B = <<"12">>,
69-
Binary200K = binary:copy(<<"x">>, 200_000),
70-
Payloads = [Binary2B, Binary200K, Binary2B],
71-
72-
{AmqplConn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
73-
[amqp_channel:call(Ch,
74-
#'basic.publish'{routing_key = <<"nowhere">>},
75-
#amqp_msg{payload = Payload})
76-
|| Payload <- Payloads],
77-
78-
OpnConf = connection_config(Config),
79-
{ok, Connection} = amqp10_client:open_connection(OpnConf),
80-
{ok, Session} = amqp10_client:begin_session_sync(Connection),
81-
Address = rabbitmq_amqp_address:exchange(<<"amq.fanout">>),
82-
{ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address),
83-
receive {amqp10_event, {link, Sender, credited}} -> ok
84-
after 30_000 -> ct:fail(credited_timeout)
85-
end,
86-
87-
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag1">>, Binary2B)),
88-
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag2">>, Binary200K)),
89-
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag3">>, Binary2B)),
90-
91-
ok = wait_for_settlement(released, <<"tag1">>),
92-
ok = wait_for_settlement(released, <<"tag2">>),
93-
ok = wait_for_settlement(released, <<"tag3">>),
84+
publish_messages(Config),
9485

9586
AmqplAfter = get_msg_size_metrics(amqp091, Config),
9687
AmqpAfter = get_msg_size_metrics(amqp10, Config),
@@ -100,10 +91,7 @@ message_size(Config) ->
10091
?assertEqual(ExpectedDiff,
10192
rabbit_msg_size_metrics:diff_raw_buckets(AmqplAfter, AmqplBefore)),
10293
?assertEqual(ExpectedDiff,
103-
rabbit_msg_size_metrics:diff_raw_buckets(AmqpAfter, AmqpBefore)),
104-
105-
ok = amqp10_client:close_connection(Connection),
106-
ok = rabbit_ct_client_helpers:close_connection_and_channel(AmqplConn, Ch).
94+
rabbit_msg_size_metrics:diff_raw_buckets(AmqpAfter, AmqpBefore)).
10795

10896
over_max_message_size(Config) ->
10997
DefaultMaxMessageSize = rpc(Config, persistent_term, get, [max_message_size]),
@@ -134,6 +122,39 @@ over_max_message_size(Config) ->
134122
ok = rabbit_ct_client_helpers:close_connection(Conn),
135123
ok = rpc(Config, persistent_term, put, [max_message_size, DefaultMaxMessageSize]).
136124

125+
summary(Config) ->
126+
ZeroSummary = [{{0, 100}, {0, 0.0}},
127+
{{101, 1000}, {0, 0.0}},
128+
{{1001, 10000}, {0, 0.0}},
129+
{{10001, 100000}, {0, 0.0}},
130+
{{100001, 1000000}, {0, 0.0}},
131+
{{1000001, 10000000}, {0, 0.0}},
132+
{{10000001, 50000000}, {0, 0.0}},
133+
{{50000001, 100000000}, {0, 0.0}},
134+
{{100000001, infinity}, {0, 0.0}}],
135+
136+
?assertEqual(ZeroSummary, rpc(Config, 0, rabbit_msg_size_metrics, local_summary, [])),
137+
?assertEqual(ZeroSummary, rpc(Config, 1, rabbit_msg_size_metrics, cluster_summary, [])),
138+
?assertEqual(ZeroSummary, rpc(Config, 0, rabbit_msg_size_metrics, local_summary, [])),
139+
?assertEqual(ZeroSummary, rpc(Config, 1, rabbit_msg_size_metrics, cluster_summary, [])),
140+
141+
publish_messages(Config),
142+
143+
ExpectedSummary = [{{0, 100}, {4, 66.66666666666666}},
144+
{{101, 1000}, {0, 0.0}},
145+
{{1001, 10000}, {0, 0.0}},
146+
{{10001, 100000}, {0, 0.0}},
147+
{{100001, 1000000}, {2, 33.33333333333333}},
148+
{{1000001, 10000000}, {0, 0.0}},
149+
{{10000001, 50000000}, {0, 0.0}},
150+
{{50000001, 100000000}, {0, 0.0}},
151+
{{100000001, infinity}, {0, 0.0}}],
152+
153+
?assertEqual(ExpectedSummary, rpc(Config, 0, rabbit_msg_size_metrics, local_summary, [])),
154+
?assertEqual(ExpectedSummary, rpc(Config, 0, rabbit_msg_size_metrics, cluster_summary, [])),
155+
?assertEqual(ExpectedSummary, rpc(Config, 1, rabbit_msg_size_metrics, cluster_summary, [])),
156+
?assertEqual(ZeroSummary, rpc(Config, 1, rabbit_msg_size_metrics, local_summary, [])).
157+
137158
get_msg_size_metrics(Protocol, Config) ->
138159
rpc(Config, rabbit_msg_size_metrics, raw_buckets, [Protocol]).
139160

@@ -145,6 +166,36 @@ connection_config(Config) ->
145166
container_id => <<"my container">>,
146167
sasl => anon}.
147168

169+
publish_messages(Config) ->
170+
Binary2B = <<"12">>,
171+
Binary200K = binary:copy(<<"x">>, 200_000),
172+
Payloads = [Binary2B, Binary200K, Binary2B],
173+
174+
{AmqplConn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
175+
[amqp_channel:call(Ch,
176+
#'basic.publish'{routing_key = <<"nowhere">>},
177+
#amqp_msg{payload = Payload})
178+
|| Payload <- Payloads],
179+
180+
OpnConf = connection_config(Config),
181+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
182+
{ok, Session} = amqp10_client:begin_session_sync(Connection),
183+
Address = rabbitmq_amqp_address:exchange(<<"amq.fanout">>),
184+
{ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address),
185+
receive {amqp10_event, {link, Sender, credited}} -> ok
186+
after 30_000 -> ct:fail(credited_timeout)
187+
end,
188+
189+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag1">>, Binary2B)),
190+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag2">>, Binary200K)),
191+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag3">>, Binary2B)),
192+
193+
ok = wait_for_settlement(released, <<"tag1">>),
194+
ok = wait_for_settlement(released, <<"tag2">>),
195+
ok = wait_for_settlement(released, <<"tag3">>),
196+
ok = amqp10_client:close_connection(Connection),
197+
ok = rabbit_ct_client_helpers:close_connection_and_channel(AmqplConn, Ch).
198+
148199
wait_for_settlement(State, Tag) ->
149200
receive
150201
{amqp10_disposition, {State, Tag}} ->
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
## This Source Code Form is subject to the terms of the Mozilla Public
2+
## License, v. 2.0. If a copy of the MPL was not distributed with this
3+
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
##
5+
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
7+
defmodule RabbitMQ.CLI.Diagnostics.Commands.MessageSizeStatsCommand do
8+
alias RabbitMQ.CLI.Core.DocGuide
9+
10+
@behaviour RabbitMQ.CLI.CommandBehaviour
11+
12+
@default_timeout 60_000
13+
14+
def scopes(), do: [:ctl, :diagnostics]
15+
16+
def switches(), do: [timeout: :integer]
17+
def aliases(), do: [t: :timeout]
18+
19+
def merge_defaults(args, opts) do
20+
timeout =
21+
case opts[:timeout] do
22+
nil -> @default_timeout
23+
:infinity -> @default_timeout
24+
other -> other
25+
end
26+
27+
{args, Map.merge(%{timeout: timeout}, opts)}
28+
end
29+
30+
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments
31+
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
32+
33+
def run([], %{node: node_name, timeout: timeout}) do
34+
:rabbit_misc.rpc_call(node_name, :rabbit_msg_size_metrics, :cluster_summary_for_cli, [], timeout)
35+
end
36+
37+
use RabbitMQ.CLI.DefaultOutput
38+
39+
def formatter(), do: RabbitMQ.CLI.Formatters.PrettyTable
40+
41+
def usage, do: "message_size_stats"
42+
43+
def usage_doc_guides() do
44+
[
45+
DocGuide.monitoring()
46+
]
47+
end
48+
49+
def help_section(), do: :observability_and_health_checks
50+
51+
def description(),
52+
do: "Displays message size distribution statistics aggregated across all cluster nodes"
53+
54+
def banner(_, %{node: node_name}), do: "Gathering message size statistics from cluster via #{node_name} ..."
55+
56+
end

0 commit comments

Comments
 (0)