diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 2c59ca6cf288..65fc3200c4ac 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -479,8 +479,10 @@ rabbitmq_integration_suite( ) rabbitmq_integration_suite( - name = "global_metrics_SUITE", - size = "small", + name = "msg_size_metrics_SUITE", + runtime_deps = [ + "//deps/rabbitmq_amqp_client:erlang_app", + ], ) rabbitmq_integration_suite( diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index a87f911862d9..bc0ad2830a5b 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -169,7 +169,7 @@ def all_beam_files(name = "all_beam_files"): "src/rabbit_metrics.erl", "src/rabbit_mirror_queue_misc.erl", "src/rabbit_mnesia.erl", - "src/rabbit_msg_size_metrics.erl", + "src/rabbit_msg_size_metrics.erl", "src/rabbit_msg_store.erl", "src/rabbit_msg_store_gc.erl", "src/rabbit_networking.erl", @@ -426,7 +426,7 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/rabbit_metrics.erl", "src/rabbit_mirror_queue_misc.erl", "src/rabbit_mnesia.erl", - "src/rabbit_msg_size_metrics.erl", + "src/rabbit_msg_size_metrics.erl", "src/rabbit_msg_store.erl", "src/rabbit_msg_store_gc.erl", "src/rabbit_networking.erl", @@ -705,6 +705,7 @@ def all_srcs(name = "all_srcs"): "src/rabbit_metrics.erl", "src/rabbit_mirror_queue_misc.erl", "src/rabbit_mnesia.erl", + "src/rabbit_msg_size_metrics.erl", "src/rabbit_msg_store.erl", "src/rabbit_msg_store_gc.erl", "src/rabbit_networking.erl", @@ -1723,7 +1724,6 @@ def test_suite_beam_files(name = "test_suite_beam_files"): outs = ["test/unit_msg_size_metrics_SUITE.beam"], app_name = "rabbit", erlc_opts = "//:test_erlc_opts", - deps = [], ) erlang_bytecode( name = "unit_operator_policy_SUITE_beam_files", @@ -2194,3 +2194,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"): app_name = "rabbit", erlc_opts = "//:test_erlc_opts", ) + erlang_bytecode( + name = "msg_size_metrics_SUITE_beam_files", + testonly = True, + srcs = ["test/msg_size_metrics_SUITE.erl"], + outs = ["test/msg_size_metrics_SUITE.beam"], + app_name = "rabbit", + erlc_opts = "//:test_erlc_opts", + deps = ["//deps/amqp_client:erlang_app"], + ) diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index c7b7beee449b..2885dd2b79fc 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -2336,8 +2336,9 @@ incoming_link_transfer( {MsgBin0, FirstDeliveryId, FirstSettled} end, validate_transfer_rcv_settle_mode(RcvSettleMode, Settled), - validate_message_size(PayloadBin, MaxMessageSize), - rabbit_global_counters:message_size(?PROTOCOL, byte_size(PayloadBin)), + PayloadSize = iolist_size(PayloadBin), + validate_message_size(PayloadSize, MaxMessageSize), + rabbit_msg_size_metrics:observe(?PROTOCOL, PayloadSize), Mc0 = mc:init(mc_amqp, PayloadBin, #{}), case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of @@ -3067,9 +3068,8 @@ validate_transfer_rcv_settle_mode(_, _) -> validate_message_size(_, unlimited) -> ok; -validate_message_size(Message, MaxMsgSize) - when is_integer(MaxMsgSize) -> - MsgSize = iolist_size(Message), +validate_message_size(MsgSize, MaxMsgSize) + when is_integer(MsgSize) -> case MsgSize =< MaxMsgSize of true -> ok; @@ -3083,7 +3083,9 @@ validate_message_size(Message, MaxMsgSize) ?V_1_0_LINK_ERROR_MESSAGE_SIZE_EXCEEDED, "message size (~b bytes) > maximum message size (~b bytes)", [MsgSize, MaxMsgSize]) - end. + end; +validate_message_size(Msg, MaxMsgSize) -> + validate_message_size(iolist_size(Msg), MaxMsgSize). -spec ensure_terminus(source | target, term(), diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 9fdb1a0c8f69..4be86370c390 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -985,8 +985,7 @@ check_msg_size(Content, GCThreshold) -> Size = rabbit_basic:maybe_gc_large_msg(Content, GCThreshold), case Size =< MaxMessageSize of true -> - rabbit_global_counters:message_size(amqp091, Size), - ok; + rabbit_msg_size_metrics:observe(amqp091, Size); false -> Fmt = case MaxMessageSize of ?MAX_MSG_SIZE -> diff --git a/deps/rabbit/src/rabbit_global_counters.erl b/deps/rabbit/src/rabbit_global_counters.erl index 5dfa77053d54..7b480c91d6cf 100644 --- a/deps/rabbit/src/rabbit_global_counters.erl +++ b/deps/rabbit/src/rabbit_global_counters.erl @@ -13,7 +13,6 @@ boot_step/0, init/1, init/2, - overview/0, prometheus_format/0, increase_protocol_counter/3, messages_received/2, @@ -34,11 +33,14 @@ publisher_deleted/1, consumer_created/1, consumer_deleted/1, - message_size/2, messages_dead_lettered/4, messages_dead_lettered_confirmed/3 ]). +-ifdef(TEST). +-export([overview/0]). +-endif. + %% PROTOCOL COUNTERS: -define(MESSAGES_RECEIVED, 1). -define(MESSAGES_RECEIVED_CONFIRM, 2). @@ -133,12 +135,14 @@ boot_step() -> [begin %% Protocol counters - init([{protocol, Proto}]), + Protocol = {protocol, Proto}, + init([Protocol]), + rabbit_msg_size_metrics:init(Proto), %% Protocol & Queue Type counters - init([{protocol, Proto}, {queue_type, rabbit_classic_queue}]), - init([{protocol, Proto}, {queue_type, rabbit_quorum_queue}]), - init([{protocol, Proto}, {queue_type, rabbit_stream_queue}]) + init([Protocol, {queue_type, rabbit_classic_queue}]), + init([Protocol, {queue_type, rabbit_quorum_queue}]), + init([Protocol, {queue_type, rabbit_stream_queue}]) end || Proto <- [amqp091, amqp10]], %% Dead Letter counters @@ -187,21 +191,19 @@ init(Labels = [{protocol, Protocol}, {queue_type, QueueType}], Extra) -> init(Labels = [{protocol, Protocol}], Extra) -> _ = seshat:new_group(?MODULE), Counters = seshat:new(?MODULE, Labels, ?PROTOCOL_COUNTERS ++ Extra), - persistent_term:put({?MODULE, Protocol}, Counters), - rabbit_msg_size_metrics:init(Labels); + persistent_term:put({?MODULE, Protocol}, Counters); init(Labels = [{queue_type, QueueType}, {dead_letter_strategy, DLS}], DeadLetterCounters) -> _ = seshat:new_group(?MODULE), Counters = seshat:new(?MODULE, Labels, DeadLetterCounters), persistent_term:put({?MODULE, QueueType, DLS}, Counters). +-ifdef(TEST). overview() -> - maps:merge_with( - fun(_Key, Value1, Value2) -> maps:merge(Value1, Value2) end, - rabbit_msg_size_metrics:overview(), - seshat:overview(?MODULE)). + seshat:overview(?MODULE). +-endif. prometheus_format() -> - maps:merge(seshat:format(?MODULE), rabbit_msg_size_metrics:prometheus_format()). + seshat:format(?MODULE). increase_protocol_counter(Protocol, Counter, Num) -> counters:add(fetch(Protocol), Counter, Num). @@ -252,16 +254,13 @@ publisher_created(Protocol) -> counters:add(fetch(Protocol), ?PUBLISHERS, 1). publisher_deleted(Protocol) -> - counters:add(fetch(Protocol), ?PUBLISHERS, -1). + counters:sub(fetch(Protocol), ?PUBLISHERS, 1). consumer_created(Protocol) -> counters:add(fetch(Protocol), ?CONSUMERS, 1). consumer_deleted(Protocol) -> - counters:add(fetch(Protocol), ?CONSUMERS, -1). - -message_size(Protocol, MessageSize) -> - rabbit_msg_size_metrics:update(Protocol, MessageSize). + counters:sub(fetch(Protocol), ?CONSUMERS, 1). messages_dead_lettered(Reason, QueueType, DeadLetterStrategy, Num) -> Index = case Reason of diff --git a/deps/rabbit/src/rabbit_msg_size_metrics.erl b/deps/rabbit/src/rabbit_msg_size_metrics.erl index a03ecd60de05..d884d8fda998 100644 --- a/deps/rabbit/src/rabbit_msg_size_metrics.erl +++ b/deps/rabbit/src/rabbit_msg_size_metrics.erl @@ -5,127 +5,140 @@ %% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% -%% This module tracks received message size distribution as histograms. -%% (One histogram is represented by a set of counters, one for each -%% bucket.) +%% This module tracks received message size distribution as histogram. +%% (A histogram is represented by a set of counters, one for each bucket.) -module(rabbit_msg_size_metrics). -export([init/1, - overview/0, - prometheus_format/0, - update/2]). - -%% Useful for testing --export([overview/1, - changed_buckets/2, - cleanup/1]). + observe/2, + prometheus_format/0]). + +%% Integration tests. +-export([raw_buckets/1, + diff_raw_buckets/2]). + +-ifdef(TEST). +-export([cleanup/1]). +-endif. + +-define(BUCKET_1, 100). +-define(BUCKET_2, 1_000). +-define(BUCKET_3, 10_000). +-define(BUCKET_4, 100_000). +-define(BUCKET_5, 1_000_000). +-define(BUCKET_6, 10_000_000). +%% rabbit.max_message_size up to RabbitMQ 3.13 was 128 MiB. +%% rabbit.max_message_size since RabbitMQ 4.0 is 16 MiB. +%% To help finding an appropriate rabbit.max_message_size we also add a bucket for 50 MB. +-define(BUCKET_7, 50_000_000). +-define(BUCKET_8, 100_000_000). +%% 'infinity' means practically 512 MiB as hard limited in +%% https://github.com/rabbitmq/rabbitmq-server/blob/v4.0.2/deps/rabbit_common/include/rabbit.hrl#L254-L257 +-define(BUCKET_9, 'infinity'). -define(MSG_SIZE_BUCKETS, - [{1, 64}, - {2, 256}, - {3, 1024}, - {4, 4 * 1024}, - {5, 16 * 1024}, - {6, 64 * 1024}, - {7, 256 * 1024}, - {8, 1024 * 1024}, - {9, 4 * 1024 * 1024}, - {10, 16 * 1024 * 1024}, - {11, 64 * 1024 * 1024}, - {12, 256 * 1024 * 1024}, - {13, infinity}]). --define(MSG_SIZE_SUM_POS, 14). - --type labels() :: [{protocol, atom()}]. --type hist_values() :: #{BucketUpperBound :: integer() => integer(), sum => integer()}. - --spec init(labels()) -> any(). -init([{protocol, Protocol}]) -> - Size = ?MSG_SIZE_SUM_POS, + [{1, ?BUCKET_1}, + {2, ?BUCKET_2}, + {3, ?BUCKET_3}, + {4, ?BUCKET_4}, + {5, ?BUCKET_5}, + {6, ?BUCKET_6}, + {7, ?BUCKET_7}, + {8, ?BUCKET_8}, + {9, ?BUCKET_9}]). + +-define(POS_MSG_SIZE_SUM, 10). + +-type raw_buckets() :: [{BucketUpperBound :: non_neg_integer(), + NumObservations :: non_neg_integer()}]. + +-spec init(atom()) -> ok. +init(Protocol) -> + Size = ?POS_MSG_SIZE_SUM, Counters = counters:new(Size, [write_concurrency]), put_counters(Protocol, Counters). --spec cleanup(labels()) -> any(). -cleanup([{protocol, Protocol}]) -> - delete_counters(Protocol). - --spec overview() -> #{labels() => #{atom() => hist_values()}}. -overview() -> - LabelsList = fetch_labels(), - maps:from_list([{Labels, #{message_size_bytes => overview(Labels)}} || Labels <- LabelsList]). - --spec overview(labels()) -> hist_values(). -overview(Labels) -> - {BucketValues, Sum} = values(Labels), - BucketMap = maps:from_list(BucketValues), - BucketMap#{sum => Sum}. +-spec observe(atom(), non_neg_integer()) -> ok. +observe(Protocol, MessageSize) -> + BucketPos = find_bucket_pos(MessageSize), + Counters = get_counters(Protocol), + counters:add(Counters, BucketPos, 1), + counters:add(Counters, ?POS_MSG_SIZE_SUM, MessageSize). -spec prometheus_format() -> #{atom() => map()}. prometheus_format() -> - LabelsList = fetch_labels(), + LabelsList = get_labels(), Values = [prometheus_values(Labels) || Labels <- LabelsList], - #{message_size_bytes => #{type => histogram, help => "Size of messages received from publishers", values => Values}}. --spec update(atom(), integer()) -> any(). -update(Protocol, MessageSize) -> - BucketPos = find_hist_bucket(?MSG_SIZE_BUCKETS, MessageSize), - Counters = fetch_counters(Protocol), - counters:add(Counters, BucketPos, 1), - counters:add(Counters, ?MSG_SIZE_SUM_POS, MessageSize). - --spec changed_buckets(hist_values(), hist_values()) -> hist_values(). -changed_buckets(After, Before) -> - maps:filtermap( - fun(Key, ValueAfter) -> - case ValueAfter - maps:get(Key, Before) of - 0 -> false; - Diff -> {true, Diff} - end - end, After). - -%% -%% Helper functions -%% - -find_hist_bucket([{BucketPos, UpperBound}|_], MessageSize) when MessageSize =< UpperBound -> - BucketPos; -find_hist_bucket([{BucketPos, _Infinity}], _) -> - BucketPos; -find_hist_bucket([_|T], MessageSize) -> - find_hist_bucket(T, MessageSize). - -%% Returned bucket values are count in the range (UpperBound[N-1]-UpperBound[N]] -values(_Labels = [{protocol, Protocol}]) -> - Counters = fetch_counters(Protocol), - Sum = counters:get(Counters, ?MSG_SIZE_SUM_POS), - BucketValues = - [{UpperBound, counters:get(Counters, Pos)} - || {Pos, UpperBound} <- ?MSG_SIZE_BUCKETS], - {BucketValues, Sum}. - -%% Returned bucket values are cumulative counts, ie in the range 0-UpperBound[N], -%% as defined by Prometheus classic histogram format -prometheus_values(Labels) -> - {BucketValues, Sum} = values(Labels), - {CumulatedValues, Count} = - lists:mapfoldl( - fun({UpperBound, V}, Count0) -> - CumulatedValue = Count0 + V, - {{UpperBound, CumulatedValue}, CumulatedValue} - end, 0, BucketValues), - {Labels, CumulatedValues, Count, Sum}. +find_bucket_pos(Size) when Size =< ?BUCKET_1 -> 1; +find_bucket_pos(Size) when Size =< ?BUCKET_2 -> 2; +find_bucket_pos(Size) when Size =< ?BUCKET_3 -> 3; +find_bucket_pos(Size) when Size =< ?BUCKET_4 -> 4; +find_bucket_pos(Size) when Size =< ?BUCKET_5 -> 5; +find_bucket_pos(Size) when Size =< ?BUCKET_6 -> 6; +find_bucket_pos(Size) when Size =< ?BUCKET_7 -> 7; +find_bucket_pos(Size) when Size =< ?BUCKET_8 -> 8; +find_bucket_pos(_Size) -> 9. + +raw_buckets(Protocol) + when is_atom(Protocol) -> + Counters = get_counters(Protocol), + raw_buckets(Counters); +raw_buckets(Counters) -> + [{UpperBound, counters:get(Counters, Pos)} + || {Pos, UpperBound} <- ?MSG_SIZE_BUCKETS]. + +-spec diff_raw_buckets(raw_buckets(), raw_buckets()) -> raw_buckets(). +diff_raw_buckets(After, Before) -> + diff_raw_buckets(After, Before, []). + +diff_raw_buckets([], [], Acc) -> + lists:reverse(Acc); +diff_raw_buckets([{UpperBound, CounterAfter} | After], + [{UpperBound, CounterBefore} | Before], + Acc) -> + case CounterAfter - CounterBefore of + 0 -> + diff_raw_buckets(After, Before, Acc); + Diff -> + diff_raw_buckets(After, Before, [{UpperBound, Diff} | Acc]) + end. + +%% "If you have looked at a /metrics for a histogram, you probably noticed that the buckets +%% aren’t just a count of events that fall into them. The buckets also include a count of +%% events in all the smaller buckets, all the way up to the +Inf, bucket which is the total +%% number of events. This is known as a cumulative histogram, and why the bucket label +%% is called le, standing for less than or equal to. +%% This is in addition to buckets being counters, so Prometheus histograms are cumula‐ +%% tive in two different ways." +%% [Prometheus: Up & Running] +prometheus_values(Labels = [{protocol, Protocol}]) -> + Counters = get_counters(Protocol), + {Buckets, Count} = lists:mapfoldl( + fun({UpperBound, NumObservations}, Acc0) -> + Acc = Acc0 + NumObservations, + {{UpperBound, Acc}, Acc} + end, 0, raw_buckets(Counters)), + Sum = counters:get(Counters, ?POS_MSG_SIZE_SUM), + {Labels, Buckets, Count, Sum}. put_counters(Protocol, Counters) -> persistent_term:put({?MODULE, Protocol}, Counters). -fetch_counters(Protocol) -> +get_counters(Protocol) -> persistent_term:get({?MODULE, Protocol}). -fetch_labels() -> +get_labels() -> [[{protocol, Protocol}] || {{?MODULE, Protocol}, _} <- persistent_term:get()]. -delete_counters(Protocol) -> - persistent_term:erase({?MODULE, Protocol}). +-ifdef(TEST). +%% "Counters are not tied to the current process and are automatically +%% garbage collected when they are no longer referenced." +-spec cleanup(atom()) -> ok. +cleanup(Protocol) -> + persistent_term:erase({?MODULE, Protocol}), + ok. +-endif. diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 9387717178d9..8feba06c4803 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -3739,8 +3739,7 @@ global_counters(Config) -> messages_confirmed_total := Confirmed0, messages_routed_total := Routed0, messages_unroutable_dropped_total := UnroutableDropped0, - messages_unroutable_returned_total := UnroutableReturned0, - message_size_bytes := MsgSizeBytes0} = get_global_counters(Config), + messages_unroutable_returned_total := UnroutableReturned0} = get_global_counters(Config), #{messages_delivered_total := CQDelivered0, messages_redelivered_total := CQRedelivered0, @@ -3793,19 +3792,13 @@ global_counters(Config) -> messages_confirmed_total := Confirmed1, messages_routed_total := Routed1, messages_unroutable_dropped_total := UnroutableDropped1, - messages_unroutable_returned_total := UnroutableReturned1, - message_size_bytes := MsgSizeBytes} = get_global_counters(Config), + messages_unroutable_returned_total := UnroutableReturned1} = get_global_counters(Config), ?assertEqual(Received0 + 2, Received1), ?assertEqual(ReceivedConfirm0 + 1, ReceivedConfirm1), ?assertEqual(Confirmed0 + 1, Confirmed1), ?assertEqual(Routed0 + 2, Routed1), ?assertEqual(UnroutableDropped0, UnroutableDropped1), ?assertEqual(UnroutableReturned0, UnroutableReturned1), - %% the 2 byte message body is encapsulated in an #'v1_0.data'{} - %% structure, which takes 7 bytes encoded - ?assertEqual(#{64 => 2, - sum => (2 * 7)}, - rabbit_msg_size_metrics:changed_buckets(MsgSizeBytes, MsgSizeBytes0)), #{messages_delivered_total := CQDelivered1, messages_redelivered_total := CQRedelivered1, diff --git a/deps/rabbit/test/global_metrics_SUITE.erl b/deps/rabbit/test/global_metrics_SUITE.erl deleted file mode 100644 index 571f3976b607..000000000000 --- a/deps/rabbit/test/global_metrics_SUITE.erl +++ /dev/null @@ -1,101 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% - --module(global_metrics_SUITE). - --compile([export_all, nowarn_export_all]). --include_lib("amqp_client/include/amqp_client.hrl"). --include_lib("eunit/include/eunit.hrl"). - -all() -> - [ - {group, tests} - ]. - -groups() -> - [ - {tests, [], [ - message_size, - over_max_message_size - ]} - ]. - -%% ------------------------------------------------------------------- -%% Testsuite setup/teardown. -%% ------------------------------------------------------------------- - -init_per_suite(Config) -> - rabbit_ct_helpers:log_environment(), - rabbit_ct_helpers:run_setup_steps(Config). - -end_per_suite(Config) -> - rabbit_ct_helpers:run_teardown_steps(Config). - -init_per_group(Group, Config) -> - Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodename_suffix, Group}, - {rmq_nodes_count, 1} - ]), - rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()). - -end_per_group(_Group, Config) -> - rabbit_ct_helpers:run_steps(Config, - rabbit_ct_client_helpers:teardown_steps() ++ - rabbit_ct_broker_helpers:teardown_steps()). - -init_per_testcase(Testcase, Config) -> - rabbit_ct_helpers:testcase_started(Config, Testcase). - -end_per_testcase(Testcase, Config) -> - rabbit_ct_helpers:testcase_finished(Config, Testcase). - -%% ------------------------------------------------------------------- -%% Test cases -%% ------------------------------------------------------------------- - -message_size(Config) -> - Binary2B = <<"12">>, - Binary2M = binary:copy(<<"x">>, 2_000_000), - - {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - - Before = get_msg_size_metrics(Config), - - [amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Payload}) - || Payload <- [Binary2B, Binary2B, Binary2M]], - - After = get_msg_size_metrics(Config), - - ?assertEqual(#{64 => 2, - 4 * 1024 * 1024 => 1, - sum => 2_000_004}, - rabbit_msg_size_metrics:changed_buckets(After, Before)). - -over_max_message_size(Config) -> - Binary4M = binary:copy(<<"x">>, 4_000_000), - - ok = rabbit_ct_broker_helpers:rpc(Config, persistent_term, put, [max_message_size, 3_000_000]), - - {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - - Before = get_msg_size_metrics(Config), - - amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary4M}), - - After = get_msg_size_metrics(Config), - - %% No metrics are bumped if over max message size - ?assertEqual(Before, After). - -%% ------------------------------------------------------------------- -%% Implementation -%% ------------------------------------------------------------------- - -get_msg_size_metrics(Config) -> - rabbit_ct_broker_helpers:rpc(Config, rabbit_msg_size_metrics, overview, [[{protocol, amqp091}]]). diff --git a/deps/rabbit/test/msg_size_metrics_SUITE.erl b/deps/rabbit/test/msg_size_metrics_SUITE.erl new file mode 100644 index 000000000000..0b33ecf1a36b --- /dev/null +++ b/deps/rabbit/test/msg_size_metrics_SUITE.erl @@ -0,0 +1,154 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(msg_size_metrics_SUITE). + +-compile([export_all, nowarn_export_all]). +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-import(rabbit_ct_broker_helpers, + [rpc/4]). + +all() -> + [ + {group, tests} + ]. + +groups() -> + [ + {tests, [shuffle], + [message_size, + over_max_message_size]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(amqp10_client), + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(_Group, Config) -> + rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_group(_Group, Config) -> + rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Test cases +%% ------------------------------------------------------------------- + +message_size(Config) -> + AmqplBefore = get_msg_size_metrics(amqp091, Config), + AmqpBefore = get_msg_size_metrics(amqp10, Config), + + Binary2B = <<"12">>, + Binary200K = binary:copy(<<"x">>, 200_000), + Payloads = [Binary2B, Binary200K, Binary2B], + + {AmqplConn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), + [amqp_channel:call(Ch, + #'basic.publish'{routing_key = <<"nowhere">>}, + #amqp_msg{payload = Payload}) + || Payload <- Payloads], + + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + Address = rabbitmq_amqp_address:exchange(<<"amq.fanout">>), + {ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address), + receive {amqp10_event, {link, Sender, credited}} -> ok + after 5000 -> ct:fail(credited_timeout) + end, + + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag1">>, Binary2B)), + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag2">>, Binary200K)), + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag3">>, Binary2B)), + + ok = wait_for_settlement(released, <<"tag1">>), + ok = wait_for_settlement(released, <<"tag2">>), + ok = wait_for_settlement(released, <<"tag3">>), + + AmqplAfter = get_msg_size_metrics(amqp091, Config), + AmqpAfter = get_msg_size_metrics(amqp10, Config), + + ExpectedDiff = [{100, 2}, + {1_000_000, 1}], + ?assertEqual(ExpectedDiff, + rabbit_msg_size_metrics:diff_raw_buckets(AmqplAfter, AmqplBefore)), + ?assertEqual(ExpectedDiff, + rabbit_msg_size_metrics:diff_raw_buckets(AmqpAfter, AmqpBefore)), + + ok = amqp10_client:close_connection(Connection), + ok = rabbit_ct_client_helpers:close_connection_and_channel(AmqplConn, Ch). + +over_max_message_size(Config) -> + DefaultMaxMessageSize = rpc(Config, persistent_term, get, [max_message_size]), + %% Limit the server to only accept messages up to 2KB. + MaxMessageSize = 2_000, + ok = rpc(Config, persistent_term, put, [max_message_size, MaxMessageSize]), + + Before = get_msg_size_metrics(amqp091, Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + MonitorRef = erlang:monitor(process, Ch), + MessageTooLarge = binary:copy(<<"x">>, MaxMessageSize + 1), + amqp_channel:call(Ch, + #'basic.publish'{routing_key = <<"none">>}, + #amqp_msg{payload = MessageTooLarge}), + receive {'DOWN', MonitorRef, process, Ch, Info} -> + ?assertEqual({shutdown, + {server_initiated_close, + 406, + <<"PRECONDITION_FAILED - message size 2001 is larger than configured max size 2000">>}}, + Info) + after 2000 -> ct:fail(expected_channel_closed) + end, + + After = get_msg_size_metrics(amqp091, Config), + %% No metrics should be increased if client sent message that is too large. + ?assertEqual(Before, After), + + ok = rabbit_ct_client_helpers:close_connection(Conn), + ok = rpc(Config, persistent_term, put, [max_message_size, DefaultMaxMessageSize]). + +get_msg_size_metrics(Protocol, Config) -> + rpc(Config, rabbit_msg_size_metrics, raw_buckets, [Protocol]). + +connection_config(Config) -> + Host = ?config(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + #{address => Host, + port => Port, + container_id => <<"my container">>, + sasl => anon}. + +wait_for_settlement(State, Tag) -> + receive + {amqp10_disposition, {State, Tag}} -> + ok + after 5000 -> + ct:fail({disposition_timeout, Tag}) + end. diff --git a/deps/rabbit/test/unit_msg_size_metrics_SUITE.erl b/deps/rabbit/test/unit_msg_size_metrics_SUITE.erl index b46436a4666d..cd496932cd92 100644 --- a/deps/rabbit/test/unit_msg_size_metrics_SUITE.erl +++ b/deps/rabbit/test/unit_msg_size_metrics_SUITE.erl @@ -9,19 +9,19 @@ -include_lib("stdlib/include/assert.hrl"). --compile(export_all). +-compile([nowarn_export_all, export_all]). all() -> [ - {group, tests} + {group, tests} ]. groups() -> [ - {tests, [], - [ - smoketest - ]} + {tests, [], + [ + prometheus_format + ]} ]. %% ------------------------------------------------------------------- @@ -29,96 +29,36 @@ groups() -> %% ------------------------------------------------------------------- init_per_suite(Config) -> - _ = rabbit_msg_size_metrics:init([{protocol, proto1}]), + ok = rabbit_msg_size_metrics:init(fake_protocol), Config. end_per_suite(Config) -> - _ = rabbit_msg_size_metrics:cleanup([{protocol, proto1}]), - Config. - -init_per_group(_, Config) -> - Config. - -end_per_group(_, Config) -> - Config. - -init_per_testcase(_Testcase, Config) -> - Config. - -end_per_testcase(_Testcase, Config) -> + ok = rabbit_msg_size_metrics:cleanup(fake_protocol), Config. %% ------------------------------------------------------------------- %% Testcases. %% ------------------------------------------------------------------- -smoketest(_Config) -> - - OverviewBefore = rabbit_msg_size_metrics:overview([{protocol, proto1}]), - - _ = rabbit_msg_size_metrics:update(proto1, 100), - _ = rabbit_msg_size_metrics:update(proto1, 60_000), - - OverviewAfter = rabbit_msg_size_metrics:overview([{protocol, proto1}]), - - ?assertEqual( - #{256 => 1, - 65536 => 1, - sum => 60_100}, - rabbit_msg_size_metrics:changed_buckets(OverviewAfter, OverviewBefore)), - - ExpectedHistValues = - #{64 => 0, - 256 => 1, - 1024 => 0, - 4096 => 0, - 16384 => 0, - 65536 => 1, - 262144 => 0, - 1048576 => 0, - 4194304 => 0, - 16777216 => 0, - 67108864 => 0, - 268435456 => 0, - infinity => 0, - sum => 60_100}, - - ?assertEqual(ExpectedHistValues, OverviewAfter), - ?assertEqual( - #{[{protocol, proto1}] => #{message_size_bytes => ExpectedHistValues}}, - rabbit_msg_size_metrics:overview()), +prometheus_format(_Config) -> + MsgSizes = [1, 100, 1_000_000_000, 99_000_000, 15_000, 15_000], + [ok = rabbit_msg_size_metrics:observe(fake_protocol, MsgSize) || MsgSize <- MsgSizes], ?assertEqual( #{message_size_bytes => - #{type => histogram, - values => - [{[{protocol,proto1}], - [{64,0}, - {256,1}, - {1024,1}, - {4096,1}, - {16384,1}, - {65536,2}, - {262144,2}, - {1048576,2}, - {4194304,2}, - {16777216,2}, - {67108864,2}, - {268435456,2}, - {infinity,2}], - 2, - 60_100}], - help => - "Size of messages received from publishers"}}, - rabbit_msg_size_metrics:prometheus_format()), - - %% Value larger than largest limit - _ = rabbit_msg_size_metrics:update(proto1, 1_000_000_000_000), - OverviewInf = rabbit_msg_size_metrics:overview([{protocol, proto1}]), - - ?assertEqual( - #{infinity => 1, - sum => 1_000_000_000_000}, - rabbit_msg_size_metrics:changed_buckets(OverviewInf, OverviewAfter)), - - ok. + #{type => histogram, + help => "Size of messages received from publishers", + values => [{ + [{protocol, fake_protocol}], + [{100, 2}, + {1_000, 2}, + {10_000, 2}, + {100_000, 4}, + {1_000_000, 4}, + {10_000_000, 4}, + {50_000_000, 4}, + {100_000_000, 5}, + {infinity, 6}], + length(MsgSizes), + lists:sum(MsgSizes)}]}}, + rabbit_msg_size_metrics:prometheus_format()). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl index 4cf28db804d5..694b31687262 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl @@ -87,7 +87,8 @@ init_global_counters(ProtoVer) -> rabbit_global_counters:init([Proto]), rabbit_global_counters:init([Proto, {queue_type, rabbit_classic_queue}]), rabbit_global_counters:init([Proto, {queue_type, rabbit_quorum_queue}]), - rabbit_global_counters:init([Proto, {queue_type, ?QUEUE_TYPE_QOS_0}]). + rabbit_global_counters:init([Proto, {queue_type, ?QUEUE_TYPE_QOS_0}]), + rabbit_msg_size_metrics:init(ProtoVer). persist_static_configuration() -> rabbit_mqtt_util:init_sparkplug(), diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 79b0950f2668..939d82b0d9e8 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -391,7 +391,7 @@ process_request(?PUBLISH, {ok, Topic, Props, State1} -> EffectiveQos = maybe_downgrade_qos(Qos), rabbit_global_counters:messages_received(ProtoVer, 1), - rabbit_global_counters:message_size(ProtoVer, iolist_size(Payload)), + rabbit_msg_size_metrics:observe(ProtoVer, iolist_size(Payload)), State = maybe_increment_publisher(State1), Msg = #mqtt_msg{retain = Retain, qos = EffectiveQos, diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl index e265243d9c99..16afac557d82 100644 --- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl @@ -88,6 +88,7 @@ cluster_size_1_tests_v3() -> cluster_size_1_tests() -> [ global_counters %% must be the 1st test case + ,message_size_metrics ,block_only_publisher ,many_qos1_messages ,session_expiry @@ -691,6 +692,34 @@ global_counters(Config) -> messages_unroutable_returned_total => 1}, get_global_counters(Config, ProtoVer))). +message_size_metrics(Config) -> + Protocol = case ?config(mqtt_version, Config) of + v4 -> mqtt311; + v5 -> mqtt50 + end, + BucketsBefore = rpc(Config, rabbit_msg_size_metrics, raw_buckets, [Protocol]), + + Topic = ClientId = atom_to_binary(?FUNCTION_NAME), + C = connect(ClientId, Config), + {ok, _, [0]} = emqtt:subscribe(C, Topic, qos0), + Payload1B = <<255>>, + Payload500B = binary:copy(Payload1B, 500), + Payload5KB = binary:copy(Payload1B, 5_000), + Payload2MB = binary:copy(Payload1B, 2_000_000), + Payloads = [Payload2MB, Payload5KB, Payload500B, Payload1B, Payload500B], + [ok = emqtt:publish(C, Topic, P, qos0) || P <- Payloads], + ok = expect_publishes(C, Topic, Payloads), + + BucketsAfter = rpc(Config, rabbit_msg_size_metrics, raw_buckets, [Protocol]), + ?assertEqual( + [{100, 1}, + {1000, 2}, + {10_000, 1}, + {10_000_000, 1}], + rabbit_msg_size_metrics:diff_raw_buckets(BucketsAfter, BucketsBefore)), + + ok = emqtt:disconnect(C). + pubsub(Config) -> Topic0 = <<"t/0">>, Topic1 = <<"t/1">>, diff --git a/deps/rabbitmq_mqtt/test/v5_SUITE.erl b/deps/rabbitmq_mqtt/test/v5_SUITE.erl index 6495551d0ecb..475b9450af9a 100644 --- a/deps/rabbitmq_mqtt/test/v5_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/v5_SUITE.erl @@ -128,8 +128,7 @@ cluster_size_1_tests() -> topic_alias_disallowed_retained_message, extended_auth, headers_exchange, - consistent_hash_exchange, - global_counters + consistent_hash_exchange ]. cluster_size_3_tests() -> @@ -2102,27 +2101,6 @@ consistent_hash_exchange(Config) -> [#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = Q}) || Q <- Qs], ok = rabbit_ct_client_helpers:close_channels_and_connection(Config, 0). -global_counters(Config) -> - Topic = ClientId = atom_to_binary(?FUNCTION_NAME), - PacketSize1 = 10, - PacketSize2 = 200, - C = connect(ClientId, Config, []), - {ok, _, [1]} = emqtt:subscribe(C, Topic, qos1), - Payload1 = binary:copy(<<"x">>, PacketSize1), - Payload2 = binary:copy(<<"x">>, PacketSize2), - Before = msg_size_metrics(mqtt50, Config), - %% We expect the PUBLISH from client to server to succeed. - ?assertMatch({ok, _}, emqtt:publish(C, Topic, Payload1, [{qos, 1}])), - ?assertMatch({ok, _}, emqtt:publish(C, Topic, Payload2, [{qos, 1}])), - ?assertMatch({ok, _}, emqtt:publish(C, Topic, Payload2, [{qos, 1}])), - After = msg_size_metrics(mqtt50, Config), - ?assertEqual(#{64 => 1, - 256 => 2, - sum => 410}, - rabbit_msg_size_metrics:changed_buckets(After, Before)), - ok = emqtt:disconnect(C), - ok. - %% ------------------------------------------------------------------- %% Helpers %% ------------------------------------------------------------------- @@ -2159,9 +2137,6 @@ dead_letter_metric(Metric, Config, Strategy) -> Map = maps:get([{queue_type, rabbit_classic_queue}, {dead_letter_strategy, Strategy}], Counters), maps:get(Metric, Map). -msg_size_metrics(ProtoVer, Config) -> - rpc(Config, rabbit_msg_size_metrics, overview, [[{protocol, ProtoVer}]]). - assert_nothing_received() -> assert_nothing_received(500). diff --git a/deps/rabbitmq_prometheus/app.bzl b/deps/rabbitmq_prometheus/app.bzl index a77dcbb9bb09..3084d1ced302 100644 --- a/deps/rabbitmq_prometheus/app.bzl +++ b/deps/rabbitmq_prometheus/app.bzl @@ -14,6 +14,7 @@ def all_beam_files(name = "all_beam_files"): "src/collectors/prometheus_rabbitmq_core_metrics_collector.erl", "src/collectors/prometheus_rabbitmq_dynamic_collector.erl", "src/collectors/prometheus_rabbitmq_global_metrics_collector.erl", + "src/collectors/prometheus_rabbitmq_message_size_metrics_collector.erl", "src/rabbit_prometheus_app.erl", "src/rabbit_prometheus_dispatcher.erl", "src/rabbit_prometheus_handler.erl", @@ -44,6 +45,7 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/collectors/prometheus_rabbitmq_core_metrics_collector.erl", "src/collectors/prometheus_rabbitmq_dynamic_collector.erl", "src/collectors/prometheus_rabbitmq_global_metrics_collector.erl", + "src/collectors/prometheus_rabbitmq_message_size_metrics_collector.erl", "src/rabbit_prometheus_app.erl", "src/rabbit_prometheus_dispatcher.erl", "src/rabbit_prometheus_handler.erl", @@ -85,6 +87,7 @@ def all_srcs(name = "all_srcs"): "src/collectors/prometheus_rabbitmq_core_metrics_collector.erl", "src/collectors/prometheus_rabbitmq_dynamic_collector.erl", "src/collectors/prometheus_rabbitmq_global_metrics_collector.erl", + "src/collectors/prometheus_rabbitmq_message_size_metrics_collector.erl", "src/rabbit_prometheus_app.erl", "src/rabbit_prometheus_dispatcher.erl", "src/rabbit_prometheus_handler.erl", diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_global_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_global_metrics_collector.erl index 143325098f06..ffb21aa276e0 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_global_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_global_metrics_collector.erl @@ -29,21 +29,19 @@ register() -> ok = prometheus_registry:register_collector(?MODULE). -deregister_cleanup(_) -> ok. +deregister_cleanup(_) -> + ok. collect_mf(_Registry, Callback) -> - _ = maps:fold( - fun (Name, #{type := Type, help := Help, values := Values}, Acc) -> - Callback( - create_mf(?METRIC_NAME(Name), - Help, - Type, - ensure_list(Values))), - Acc - end, - ok, - rabbit_global_counters:prometheus_format() - ). + maps:foreach( + fun(Name, #{type := Type, help := Help, values := Values}) -> + Callback( + create_mf(?METRIC_NAME(Name), + Help, + Type, + ensure_list(Values))) + end, + rabbit_global_counters:prometheus_format()). %% =================================================================== %% Private functions diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_message_size_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_message_size_metrics_collector.erl new file mode 100644 index 000000000000..54a349547744 --- /dev/null +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_message_size_metrics_collector.erl @@ -0,0 +1,33 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% +-module(prometheus_rabbitmq_message_size_metrics_collector). + +-behaviour(prometheus_collector). +-include_lib("prometheus/include/prometheus.hrl"). + +-export([register/0, + deregister_cleanup/1, + collect_mf/2]). + +-define(METRIC_NAME_PREFIX, "rabbitmq_"). + +register() -> + ok = prometheus_registry:register_collector(?MODULE). + +deregister_cleanup(_) -> + ok. + +collect_mf(_Registry, Callback) -> + maps:foreach( + fun(Name, #{type := Type, + help := Help, + values := Values}) -> + MetricsFamily = prometheus_model_helpers:create_mf( + ?METRIC_NAME(Name), Help, Type, Values), + Callback(MetricsFamily) + end, + rabbit_msg_size_metrics:prometheus_format()). diff --git a/deps/rabbitmq_prometheus/src/rabbit_prometheus_dispatcher.erl b/deps/rabbitmq_prometheus/src/rabbit_prometheus_dispatcher.erl index 850494e00666..2b07be760098 100644 --- a/deps/rabbitmq_prometheus/src/rabbit_prometheus_dispatcher.erl +++ b/deps/rabbitmq_prometheus/src/rabbit_prometheus_dispatcher.erl @@ -16,6 +16,7 @@ build_dispatcher() -> prometheus_registry:register_collectors([ prometheus_rabbitmq_core_metrics_collector, prometheus_rabbitmq_global_metrics_collector, + prometheus_rabbitmq_message_size_metrics_collector, prometheus_rabbitmq_alarm_metrics_collector, prometheus_rabbitmq_dynamic_collector, prometheus_process_collector]), @@ -27,7 +28,8 @@ build_dispatcher() -> prometheus_vm_statistics_collector, prometheus_vm_msacc_collector, prometheus_rabbitmq_core_metrics_collector, - prometheus_rabbitmq_global_metrics_collector + prometheus_rabbitmq_global_metrics_collector, + prometheus_rabbitmq_message_size_metrics_collector ]), prometheus_registry:register_collectors('detailed', [ prometheus_rabbitmq_core_metrics_collector diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index 578a9691be82..a0c64ebc6c5d 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -38,13 +38,15 @@ groups() -> aggregated_metrics_test, specific_erlang_metrics_present_test, global_metrics_present_test, - global_metrics_single_metric_family_test + global_metrics_single_metric_family_test, + message_size_metrics_present ]}, {per_object_metrics, [], [ globally_configure_per_object_metrics_test, specific_erlang_metrics_present_test, global_metrics_present_test, - global_metrics_single_metric_family_test + global_metrics_single_metric_family_test, + message_size_metrics_present ]}, {per_object_endpoint_metrics, [], [ endpoint_per_object_metrics, @@ -488,24 +490,36 @@ global_metrics_present_test(Config) -> ?assertEqual(match, re:run(Body, "^rabbitmq_global_messages_redelivered_total{", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_global_messages_acknowledged_total{", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_global_publishers{", [{capture, none}, multiline])), - ?assertEqual(match, re:run(Body, "^rabbitmq_global_consumers{", [{capture, none}, multiline])), - - ?assertEqual(match, re:run(Body, "^rabbitmq_global_message_size_bytes_bucket{protocol=\"amqp091\",le=\"64\"}", [{capture, none}, multiline])), - ?assertEqual(match, re:run(Body, "^rabbitmq_global_message_size_bytes_bucket{protocol=\"amqp091\",le=\"256\"}", [{capture, none}, multiline])), - ?assertEqual(match, re:run(Body, "^rabbitmq_global_message_size_bytes_bucket{protocol=\"amqp091\",le=\"1024\"}", [{capture, none}, multiline])), - ?assertEqual(match, re:run(Body, "^rabbitmq_global_message_size_bytes_bucket{protocol=\"amqp091\",le=\"4096\"}", [{capture, none}, multiline])), - ?assertEqual(match, re:run(Body, "^rabbitmq_global_message_size_bytes_bucket{protocol=\"amqp091\",le=\"16384\"}", [{capture, none}, multiline])), - ?assertEqual(match, re:run(Body, "^rabbitmq_global_message_size_bytes_bucket{protocol=\"amqp091\",le=\"65536\"}", [{capture, none}, multiline])), - ?assertEqual(match, re:run(Body, "^rabbitmq_global_message_size_bytes_bucket{protocol=\"amqp091\",le=\"262144\"}", [{capture, none}, multiline])), - ?assertEqual(match, re:run(Body, "^rabbitmq_global_message_size_bytes_bucket{protocol=\"amqp091\",le=\"1048576\"}", [{capture, none}, multiline])), - ?assertEqual(match, re:run(Body, "^rabbitmq_global_message_size_bytes_bucket{protocol=\"amqp091\",le=\"4194304\"}", [{capture, none}, multiline])), - ?assertEqual(match, re:run(Body, "^rabbitmq_global_message_size_bytes_bucket{protocol=\"amqp091\",le=\"16777216\"}", [{capture, none}, multiline])), - ?assertEqual(match, re:run(Body, "^rabbitmq_global_message_size_bytes_bucket{protocol=\"amqp091\",le=\"67108864\"}", [{capture, none}, multiline])), - ?assertEqual(match, re:run(Body, "^rabbitmq_global_message_size_bytes_bucket{protocol=\"amqp091\",le=\"268435456\"}", [{capture, none}, multiline])), - ?assertEqual(match, re:run(Body, "^rabbitmq_global_message_size_bytes_bucket{protocol=\"amqp091\",le=\"\\+Inf\"}", [{capture, none}, multiline])), - ?assertEqual(match, re:run(Body, "^rabbitmq_global_message_size_bytes_count{protocol=\"amqp091\"}", [{capture, none}, multiline])), - ?assertEqual(match, re:run(Body, "^rabbitmq_global_message_size_bytes_sum{protocol=\"amqp091\"}", [{capture, none}, multiline])), - ok. + ?assertEqual(match, re:run(Body, "^rabbitmq_global_consumers{", [{capture, none}, multiline])). + +message_size_metrics_present(Config) -> + {_Headers, Body} = http_get_with_pal(Config, [], 200), + + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp091\",le=\"100\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp091\",le=\"1000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp091\",le=\"10000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp091\",le=\"100000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp091\",le=\"1000000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp091\",le=\"10000000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp091\",le=\"10000000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp091\",le=\"50000000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp091\",le=\"100000000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp091\",le=\"\\+Inf\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_count{protocol=\"amqp091\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_sum{protocol=\"amqp091\"}", [{capture, none}, multiline])), + + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp10\",le=\"100\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp10\",le=\"1000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp10\",le=\"10000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp10\",le=\"100000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp10\",le=\"1000000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp10\",le=\"10000000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp10\",le=\"10000000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp10\",le=\"50000000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp10\",le=\"100000000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp10\",le=\"\\+Inf\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_count{protocol=\"amqp10\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_sum{protocol=\"amqp10\"}", [{capture, none}, multiline])). global_metrics_single_metric_family_test(Config) -> {_Headers, Body} = http_get_with_pal(Config, [], 200), diff --git a/moduleindex.yaml b/moduleindex.yaml index 02f800fcd252..ebadcd41d644 100755 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -670,6 +670,7 @@ rabbit: - rabbit_metrics - rabbit_mirror_queue_misc - rabbit_mnesia +- rabbit_msg_size_metrics - rabbit_msg_store - rabbit_msg_store_gc - rabbit_networking @@ -1097,6 +1098,7 @@ rabbitmq_prometheus: - prometheus_rabbitmq_core_metrics_collector - prometheus_rabbitmq_dynamic_collector - prometheus_rabbitmq_global_metrics_collector +- prometheus_rabbitmq_message_size_metrics_collector - rabbit_prometheus_app - rabbit_prometheus_dispatcher - rabbit_prometheus_handler