Skip to content

Commit f39c27f

Browse files
committed
rabbitmq-diagnostics message_size_stats
This command displays cluster-wide message size statistics. It's less detailed than what can be retrieved from the Prometheus endpoint, but it'll be available to all users, regardless of their monitoring setup, or lack thereof.
1 parent 18e4bf6 commit f39c27f

File tree

4 files changed

+306
-38
lines changed

4 files changed

+306
-38
lines changed

deps/rabbit/src/rabbit_msg_size_metrics.erl

Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@
1111

1212
-export([init/1,
1313
observe/2,
14-
prometheus_format/0]).
14+
prometheus_format/0,
15+
local_summary/0,
16+
cluster_summary/0,
17+
cluster_summary_for_cli/0 ]).
1518

1619
%% Integration tests.
1720
-export([raw_buckets/1,
@@ -52,6 +55,9 @@
5255
-type raw_buckets() :: [{BucketUpperBound :: non_neg_integer(),
5356
NumObservations :: non_neg_integer()}].
5457

58+
-type summary_entry() :: {{non_neg_integer(), non_neg_integer() | infinity}, {non_neg_integer(), float()}}.
59+
-type summary() :: [summary_entry()].
60+
5561
-spec init(atom()) -> ok.
5662
init(Protocol) ->
5763
Size = ?POS_MSG_SIZE_SUM,
@@ -133,6 +139,109 @@ get_labels_counters() ->
133139
[{[{protocol, Protocol}], Counters}
134140
|| {{?MODULE, Protocol}, Counters} <- persistent_term:get()].
135141

142+
get_protocols() ->
143+
[Protocol
144+
|| {{?MODULE, Protocol}, _} <- persistent_term:get()].
145+
146+
%% Aggregates data for all protocols on the local node
147+
-spec local_summary() -> summary().
148+
local_summary() ->
149+
PerProtocolBuckets = lists:map(fun(Protocol) ->
150+
raw_buckets(Protocol)
151+
end, get_protocols()),
152+
153+
%% Sum buckets for all protocols
154+
Buckets0 = [{?BUCKET_1, 0}, {?BUCKET_2, 0}, {?BUCKET_3, 0}, {?BUCKET_4, 0},
155+
{?BUCKET_5, 0}, {?BUCKET_6, 0}, {?BUCKET_7, 0}, {?BUCKET_8, 0}, {?BUCKET_9, 0}],
156+
Buckets = lists:foldl(fun sum_protocol_buckets/2,
157+
Buckets0,
158+
PerProtocolBuckets),
159+
160+
Total = lists:sum([Count || {_UpperBound, Count} <- Buckets]),
161+
162+
Ranges = lists:map(fun({UpperBound, Count}) ->
163+
Percentage = case Total of
164+
0 -> 0.0;
165+
_ -> (Count / Total) * 100
166+
end,
167+
{bucket_range(UpperBound), {Count, Percentage}}
168+
end, Buckets),
169+
170+
Ranges.
171+
172+
sum_protocol_buckets(ProtocolBuckets, Acc) ->
173+
lists:map(fun({UpperBound, AccCount}) ->
174+
ProtocolCount = proplists:get_value(UpperBound, ProtocolBuckets, 0),
175+
{UpperBound, AccCount + ProtocolCount}
176+
end, Acc).
177+
178+
%% Aggregates sumamries from all nodes
179+
-spec cluster_summary() -> summary().
180+
cluster_summary() ->
181+
RemoteNodes = [Node || Node <- rabbit_nodes:list_running(), Node =/= node()],
182+
RemoteSummaries = [ Summary || {ok, Summary} <- erpc:multicall(RemoteNodes,
183+
?MODULE,
184+
local_summary,
185+
[],
186+
5000)],
187+
lists:foldl(fun merge_summaries/2, local_summary(), RemoteSummaries).
188+
189+
bucket_name({_, ?BUCKET_1}) -> <<"below 100B">>;
190+
bucket_name({_, ?BUCKET_2}) -> <<"between 100B and 1KB">>;
191+
bucket_name({_, ?BUCKET_3}) -> <<"between 1KB and 10KB">>;
192+
bucket_name({_, ?BUCKET_4}) -> <<"between 10KB and 100KB">>;
193+
bucket_name({_, ?BUCKET_5}) -> <<"between 100KB and 1MB">>;
194+
bucket_name({_, ?BUCKET_6}) -> <<"between 1MB and 10MB">>;
195+
bucket_name({_, ?BUCKET_7}) -> <<"between 10MB and 50MB">>;
196+
bucket_name({_, ?BUCKET_8}) -> <<"between 50MB and 100MB">>;
197+
bucket_name({_, ?BUCKET_9}) -> <<"above 100MB">>.
198+
199+
cluster_summary_for_cli() ->
200+
[[{<<"Message Size">>, bucket_name(Range)},
201+
{<<"Count">>, Count},
202+
{<<"Percentage">>, iolist_to_binary(io_lib:format("~.2f", [Percentage]))}]
203+
|| {Range, {Count, Percentage}} <- cluster_summary()].
204+
205+
get_count_for_range(Range, SummaryList) ->
206+
case proplists:get_value(Range, SummaryList) of
207+
{Count, _} -> Count;
208+
undefined -> 0
209+
end.
210+
211+
%% Merges two summary lists by adding their counts and recalculating percentages
212+
merge_summaries(Summary1, Summary2) ->
213+
%% Get all bucket ranges
214+
AllRanges = lists:usort([Range || {Range, _} <- Summary1] ++ [Range || {Range, _} <- Summary2]),
215+
216+
MergedRanges = lists:map(fun(Range) ->
217+
Count1 = get_count_for_range(Range, Summary1),
218+
Count2 = get_count_for_range(Range, Summary2),
219+
NewCount = Count1 + Count2,
220+
{Range, NewCount}
221+
end, AllRanges),
222+
223+
%% Calculate total and percentages
224+
NewTotal = lists:sum([Count || {_, Count} <- MergedRanges]),
225+
FinalRanges = lists:map(fun({Range, Count}) ->
226+
NewPercentage = case NewTotal of
227+
0 -> 0.0;
228+
_ -> (Count / NewTotal) * 100
229+
end,
230+
{Range, {Count, NewPercentage}}
231+
end, MergedRanges),
232+
233+
FinalRanges.
234+
235+
bucket_range(?BUCKET_1) -> {0, 100};
236+
bucket_range(?BUCKET_2) -> {101, 1000};
237+
bucket_range(?BUCKET_3) -> {1001, 10000};
238+
bucket_range(?BUCKET_4) -> {10001, 100000};
239+
bucket_range(?BUCKET_5) -> {100001, 1000000};
240+
bucket_range(?BUCKET_6) -> {1000001, 10000000};
241+
bucket_range(?BUCKET_7) -> {10000001, 50000000};
242+
bucket_range(?BUCKET_8) -> {50000001, 100000000};
243+
bucket_range(?BUCKET_9) -> {100000001, infinity}.
244+
136245
-ifdef(TEST).
137246
%% "Counters are not tied to the current process and are automatically
138247
%% garbage collected when they are no longer referenced."

deps/rabbit/test/msg_size_metrics_SUITE.erl

Lines changed: 85 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
-include_lib("amqp_client/include/amqp_client.hrl").
1414

1515
-import(rabbit_ct_broker_helpers,
16-
[rpc/4]).
16+
[rpc/4, rpc/5]).
1717

1818
all() ->
1919
[
@@ -22,9 +22,11 @@ all() ->
2222

2323
groups() ->
2424
[
25-
{tests, [shuffle],
26-
[message_size,
27-
over_max_message_size]}
25+
{tests, [],
26+
[summary, %% needs to run first
27+
message_size,
28+
over_max_message_size]
29+
}
2830
].
2931

3032
%% -------------------------------------------------------------------
@@ -34,14 +36,18 @@ groups() ->
3436
init_per_suite(Config) ->
3537
{ok, _} = application:ensure_all_started(amqp10_client),
3638
rabbit_ct_helpers:log_environment(),
37-
rabbit_ct_helpers:run_setup_steps(Config).
39+
Config.
3840

3941
end_per_suite(Config) ->
4042
rabbit_ct_helpers:run_teardown_steps(Config).
4143

4244
init_per_group(_Group, Config) ->
43-
rabbit_ct_helpers:run_steps(
44-
Config,
45+
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
46+
Config1 = rabbit_ct_helpers:set_config(
47+
Config, [{rmq_nodes_count, 3},
48+
{rmq_nodename_suffix, Suffix}]),
49+
rabbit_ct_helpers:run_setup_steps(
50+
Config1,
4551
rabbit_ct_broker_helpers:setup_steps() ++
4652
rabbit_ct_client_helpers:setup_steps()).
4753

@@ -51,6 +57,13 @@ end_per_group(_Group, Config) ->
5157
rabbit_ct_client_helpers:teardown_steps() ++
5258
rabbit_ct_broker_helpers:teardown_steps()).
5359

60+
init_per_testcase(summary, Config) ->
61+
case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.2.0') of
62+
ok ->
63+
rabbit_ct_helpers:testcase_started(Config, sumary);
64+
{skip, _} = Skip ->
65+
Skip
66+
end;
5467
init_per_testcase(Testcase, Config) ->
5568
rabbit_ct_helpers:testcase_started(Config, Testcase).
5669

@@ -65,32 +78,7 @@ message_size(Config) ->
6578
AmqplBefore = get_msg_size_metrics(amqp091, Config),
6679
AmqpBefore = get_msg_size_metrics(amqp10, Config),
6780

68-
Binary2B = <<"12">>,
69-
Binary200K = binary:copy(<<"x">>, 200_000),
70-
Payloads = [Binary2B, Binary200K, Binary2B],
71-
72-
{AmqplConn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
73-
[amqp_channel:call(Ch,
74-
#'basic.publish'{routing_key = <<"nowhere">>},
75-
#amqp_msg{payload = Payload})
76-
|| Payload <- Payloads],
77-
78-
OpnConf = connection_config(Config),
79-
{ok, Connection} = amqp10_client:open_connection(OpnConf),
80-
{ok, Session} = amqp10_client:begin_session_sync(Connection),
81-
Address = rabbitmq_amqp_address:exchange(<<"amq.fanout">>),
82-
{ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address),
83-
receive {amqp10_event, {link, Sender, credited}} -> ok
84-
after 30_000 -> ct:fail(credited_timeout)
85-
end,
86-
87-
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag1">>, Binary2B)),
88-
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag2">>, Binary200K)),
89-
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag3">>, Binary2B)),
90-
91-
ok = wait_for_settlement(released, <<"tag1">>),
92-
ok = wait_for_settlement(released, <<"tag2">>),
93-
ok = wait_for_settlement(released, <<"tag3">>),
81+
publish_messages(Config),
9482

9583
AmqplAfter = get_msg_size_metrics(amqp091, Config),
9684
AmqpAfter = get_msg_size_metrics(amqp10, Config),
@@ -100,10 +88,7 @@ message_size(Config) ->
10088
?assertEqual(ExpectedDiff,
10189
rabbit_msg_size_metrics:diff_raw_buckets(AmqplAfter, AmqplBefore)),
10290
?assertEqual(ExpectedDiff,
103-
rabbit_msg_size_metrics:diff_raw_buckets(AmqpAfter, AmqpBefore)),
104-
105-
ok = amqp10_client:close_connection(Connection),
106-
ok = rabbit_ct_client_helpers:close_connection_and_channel(AmqplConn, Ch).
91+
rabbit_msg_size_metrics:diff_raw_buckets(AmqpAfter, AmqpBefore)).
10792

10893
over_max_message_size(Config) ->
10994
DefaultMaxMessageSize = rpc(Config, persistent_term, get, [max_message_size]),
@@ -134,6 +119,39 @@ over_max_message_size(Config) ->
134119
ok = rabbit_ct_client_helpers:close_connection(Conn),
135120
ok = rpc(Config, persistent_term, put, [max_message_size, DefaultMaxMessageSize]).
136121

122+
summary(Config) ->
123+
ZeroSummary = [{{0, 100}, {0, 0.0}},
124+
{{101, 1000}, {0, 0.0}},
125+
{{1001, 10000}, {0, 0.0}},
126+
{{10001, 100000}, {0, 0.0}},
127+
{{100001, 1000000}, {0, 0.0}},
128+
{{1000001, 10000000}, {0, 0.0}},
129+
{{10000001, 50000000}, {0, 0.0}},
130+
{{50000001, 100000000}, {0, 0.0}},
131+
{{100000001, infinity}, {0, 0.0}}],
132+
133+
?assertEqual(ZeroSummary, rpc(Config, 0, rabbit_msg_size_metrics, local_summary, [])),
134+
?assertEqual(ZeroSummary, rpc(Config, 1, rabbit_msg_size_metrics, cluster_summary, [])),
135+
?assertEqual(ZeroSummary, rpc(Config, 0, rabbit_msg_size_metrics, local_summary, [])),
136+
?assertEqual(ZeroSummary, rpc(Config, 1, rabbit_msg_size_metrics, cluster_summary, [])),
137+
138+
publish_messages(Config),
139+
140+
ExpectedSummary = [{{0, 100}, {4, 66.66666666666666}},
141+
{{101, 1000}, {0, 0.0}},
142+
{{1001, 10000}, {0, 0.0}},
143+
{{10001, 100000}, {0, 0.0}},
144+
{{100001, 1000000}, {2, 33.33333333333333}},
145+
{{1000001, 10000000}, {0, 0.0}},
146+
{{10000001, 50000000}, {0, 0.0}},
147+
{{50000001, 100000000}, {0, 0.0}},
148+
{{100000001, infinity}, {0, 0.0}}],
149+
150+
?assertEqual(ExpectedSummary, rpc(Config, 0, rabbit_msg_size_metrics, local_summary, [])),
151+
?assertEqual(ExpectedSummary, rpc(Config, 0, rabbit_msg_size_metrics, cluster_summary, [])),
152+
?assertEqual(ExpectedSummary, rpc(Config, 1, rabbit_msg_size_metrics, cluster_summary, [])),
153+
?assertEqual(ZeroSummary, rpc(Config, 1, rabbit_msg_size_metrics, local_summary, [])).
154+
137155
get_msg_size_metrics(Protocol, Config) ->
138156
rpc(Config, rabbit_msg_size_metrics, raw_buckets, [Protocol]).
139157

@@ -145,6 +163,36 @@ connection_config(Config) ->
145163
container_id => <<"my container">>,
146164
sasl => anon}.
147165

166+
publish_messages(Config) ->
167+
Binary2B = <<"12">>,
168+
Binary200K = binary:copy(<<"x">>, 200_000),
169+
Payloads = [Binary2B, Binary200K, Binary2B],
170+
171+
{AmqplConn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
172+
[amqp_channel:call(Ch,
173+
#'basic.publish'{routing_key = <<"nowhere">>},
174+
#amqp_msg{payload = Payload})
175+
|| Payload <- Payloads],
176+
177+
OpnConf = connection_config(Config),
178+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
179+
{ok, Session} = amqp10_client:begin_session_sync(Connection),
180+
Address = rabbitmq_amqp_address:exchange(<<"amq.fanout">>),
181+
{ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address),
182+
receive {amqp10_event, {link, Sender, credited}} -> ok
183+
after 30_000 -> ct:fail(credited_timeout)
184+
end,
185+
186+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag1">>, Binary2B)),
187+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag2">>, Binary200K)),
188+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag3">>, Binary2B)),
189+
190+
ok = wait_for_settlement(released, <<"tag1">>),
191+
ok = wait_for_settlement(released, <<"tag2">>),
192+
ok = wait_for_settlement(released, <<"tag3">>),
193+
ok = amqp10_client:close_connection(Connection),
194+
ok = rabbit_ct_client_helpers:close_connection_and_channel(AmqplConn, Ch).
195+
148196
wait_for_settlement(State, Tag) ->
149197
receive
150198
{amqp10_disposition, {State, Tag}} ->
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
## This Source Code Form is subject to the terms of the Mozilla Public
2+
## License, v. 2.0. If a copy of the MPL was not distributed with this
3+
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
##
5+
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
7+
defmodule RabbitMQ.CLI.Diagnostics.Commands.MessageSizeStatsCommand do
8+
alias RabbitMQ.CLI.Core.DocGuide
9+
10+
@behaviour RabbitMQ.CLI.CommandBehaviour
11+
12+
@default_timeout 60_000
13+
14+
def scopes(), do: [:ctl, :diagnostics]
15+
16+
def switches(), do: [timeout: :integer]
17+
def aliases(), do: [t: :timeout]
18+
19+
def merge_defaults(args, opts) do
20+
timeout =
21+
case opts[:timeout] do
22+
nil -> @default_timeout
23+
:infinity -> @default_timeout
24+
other -> other
25+
end
26+
27+
{args, Map.merge(%{timeout: timeout}, opts)}
28+
end
29+
30+
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments
31+
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
32+
33+
def run([], %{node: node_name, timeout: timeout}) do
34+
:rabbit_misc.rpc_call(node_name, :rabbit_msg_size_metrics, :cluster_summary_for_cli, [], timeout)
35+
end
36+
37+
use RabbitMQ.CLI.DefaultOutput
38+
39+
def formatter(), do: RabbitMQ.CLI.Formatters.PrettyTable
40+
41+
def usage, do: "message_size_stats"
42+
43+
def usage_doc_guides() do
44+
[
45+
DocGuide.monitoring()
46+
]
47+
end
48+
49+
def help_section(), do: :observability_and_health_checks
50+
51+
def description(),
52+
do: "Displays message size distribution statistics aggregated across all cluster nodes"
53+
54+
def banner(_, %{node: node_name}), do: "Gathering message size statistics from cluster via #{node_name} ..."
55+
56+
end

0 commit comments

Comments
 (0)