Skip to content

WIP Expose ra counters #13895

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions deps/rabbit/src/rabbit_global_counters.erl
Original file line number Diff line number Diff line change
Expand Up @@ -132,46 +132,46 @@
boot_step() ->
[begin
%% Protocol counters
Protocol = {protocol, Proto},
init([Protocol]),
Protocol = #{protocol => Proto},
init(Protocol),
rabbit_msg_size_metrics:init(Proto),

%% Protocol & Queue Type counters
init([Protocol, {queue_type, rabbit_classic_queue}]),
init([Protocol, {queue_type, rabbit_quorum_queue}]),
init([Protocol, {queue_type, rabbit_stream_queue}])
init(Protocol#{queue_type => rabbit_classic_queue}),
init(Protocol#{queue_type => rabbit_quorum_queue}),
init(Protocol#{queue_type => rabbit_stream_queue})
end || Proto <- [amqp091, amqp10]],

%% Dead Letter counters
%%
%% Streams never dead letter.
%%
%% Source classic queue dead letters.
init([{queue_type, rabbit_classic_queue}, {dead_letter_strategy, disabled}],
init(#{queue_type => rabbit_classic_queue, dead_letter_strategy => disabled},
[?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER,
?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER,
?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER]),
init([{queue_type, rabbit_classic_queue}, {dead_letter_strategy, at_most_once}],
init(#{queue_type => rabbit_classic_queue, dead_letter_strategy => at_most_once},
[?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER,
?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER,
?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER]),
%%
%% Source quorum queue dead letters.
%% Only quorum queues can dead letter due to delivery-limit exceeded.
%% Only quorum queues support dead letter strategy at-least-once.
init([{queue_type, rabbit_quorum_queue}, {dead_letter_strategy, disabled}],
init(#{queue_type => rabbit_quorum_queue, dead_letter_strategy => disabled},
[?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER,
?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER,
?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER,
?MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT_COUNTER
]),
init([{queue_type, rabbit_quorum_queue}, {dead_letter_strategy, at_most_once}],
init(#{queue_type => rabbit_quorum_queue, dead_letter_strategy => at_most_once},
[?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER,
?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER,
?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER,
?MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT_COUNTER
]),
init([{queue_type, rabbit_quorum_queue}, {dead_letter_strategy, at_least_once}],
init(#{queue_type => rabbit_quorum_queue, dead_letter_strategy => at_least_once},
[?MESSAGES_DEAD_LETTERED_CONFIRMED_COUNTER,
?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER,
?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER,
Expand All @@ -181,21 +181,21 @@ boot_step() ->
init(Labels) ->
init(Labels, []).

init(Labels = [{protocol, Protocol}, {queue_type, QueueType}], Extra) ->
init(Labels = #{protocol := Protocol, queue_type := QueueType}, Extra) ->
_ = seshat:new_group(?MODULE),
Counters = seshat:new(?MODULE, Labels, ?PROTOCOL_QUEUE_TYPE_COUNTERS ++ Extra),
Counters = seshat:new(?MODULE, Labels, ?PROTOCOL_QUEUE_TYPE_COUNTERS ++ Extra, Labels),
persistent_term:put({?MODULE, Protocol, QueueType}, Counters);
init(Labels = [{protocol, Protocol}], Extra) ->
init(Labels = #{protocol := Protocol}, Extra) ->
_ = seshat:new_group(?MODULE),
Counters = seshat:new(?MODULE, Labels, ?PROTOCOL_COUNTERS ++ Extra),
Counters = seshat:new(?MODULE, Labels, ?PROTOCOL_COUNTERS ++ Extra, Labels),
persistent_term:put({?MODULE, Protocol}, Counters);
init(Labels = [{queue_type, QueueType}, {dead_letter_strategy, DLS}], DeadLetterCounters) ->
init(Labels = #{queue_type := QueueType, dead_letter_strategy := DLS}, DeadLetterCounters) ->
_ = seshat:new_group(?MODULE),
Counters = seshat:new(?MODULE, Labels, DeadLetterCounters),
Counters = seshat:new(?MODULE, Labels, DeadLetterCounters, Labels),
persistent_term:put({?MODULE, QueueType, DLS}, Counters).

overview() ->
seshat:overview(?MODULE).
seshat:counters(?MODULE).

prometheus_format() ->
seshat:format(?MODULE).
Expand Down
1 change: 1 addition & 0 deletions deps/rabbit/src/rabbit_khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ setup(_Context) ->
{default_ra_system, ?RA_SYSTEM}]}],
[{persistent, true}]),
RaServerConfig = #{cluster_name => ?RA_CLUSTER_NAME,
metrics_labels => #{ra_system => ?RA_SYSTEM, module => ?MODULE},
friendly_name => ?RA_FRIENDLY_NAME},
case khepri:start(?RA_SYSTEM, RaServerConfig) of
{ok, ?STORE_ID} ->
Expand Down
11 changes: 8 additions & 3 deletions deps/rabbit/src/rabbit_observer_cli_quorum_queues.erl
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,11 @@ sheet_header() ->
sheet_body(PrevState) ->
{_, RaStates} = rabbit_quorum_queue:all_replica_states(),
Body = [begin
#resource{name = Name, virtual_host = Vhost} = R = amqqueue:get_name(Q),
#resource{name = Name, virtual_host = Vhost} = amqqueue:get_name(Q),
case rabbit_amqqueue:pid_of(Q) of
none ->
empty_row(Name);
{QName, _QNode} = _QQ ->
{QName, _QNode} = ServerId ->
case whereis(QName) of
undefined ->
empty_row(Name);
Expand All @@ -139,7 +139,12 @@ sheet_body(PrevState) ->
_ ->
QQCounters = maps:get({QName, node()}, ra_counters:overview()),
{ok, InternalName} = rabbit_queue_type_util:qname_to_internal_name(#resource{virtual_host = Vhost, name= Name}),
[{_, CT, SnapIdx, LA, CI, LW, CL}] = ets:lookup(ra_metrics, R),
#{snapshot_index := SnapIdx,
last_written_index := LW,
term := CT,
commit_latency := CL,
commit_index := CI,
last_applied := LA} = ra:key_metrics(ServerId),
[
Pid,
QName,
Expand Down
12 changes: 6 additions & 6 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1598,12 +1598,10 @@ transfer_leadership(Q, Destination) ->
end.

queue_length(Q) ->
Name = amqqueue:get_name(Q),
case ets:lookup(ra_metrics, Name) of
[] -> 0;
[{_, _, SnapIdx, _, _, LastIdx, _}] ->
LastIdx - SnapIdx
end.
ServerId = amqqueue:get_pid(Q),
#{snapshot_index := SnapIdx,
last_written_index := LastIdx} = key_metrics_rpc(ServerId),
LastIdx - SnapIdx.

get_replicas(Q) ->
get_nodes(Q).
Expand Down Expand Up @@ -1985,6 +1983,7 @@ make_ra_conf(Q, ServerId, TickTimeout,
SnapshotInterval, CheckpointInterval,
Membership, MacVersion) ->
QName = amqqueue:get_name(Q),
#resource{name = QNameBin} = QName,
RaMachine = ra_machine(Q),
[{ClusterName, _} | _] = Members = members(Q),
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
Expand All @@ -2000,6 +1999,7 @@ make_ra_conf(Q, ServerId, TickTimeout,
uid => UId,
friendly_name => FName,
metrics_key => QName,
metrics_labels => #{vhost => amqqueue:get_vhost(Q), queue => QNameBin},
initial_members => Members,
log_init_args => LogCfg,
tick_timeout => TickTimeout,
Expand Down
1 change: 1 addition & 0 deletions deps/rabbit/src/rabbit_stream_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1274,6 +1274,7 @@ make_ra_conf(Node, Nodes, MinMacVersion) ->
uid => UId,
friendly_name => atom_to_list(?MODULE),
metrics_key => ?MODULE,
metrics_labels => #{ra_system => ?RA_SYSTEM, module=>?MODULE},
initial_members => Members,
log_init_args => #{uid => UId},
tick_timeout => TickTimeout,
Expand Down
6 changes: 3 additions & 3 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6903,11 +6903,11 @@ formatted_state(Pid) ->
proplists:get_value("State", L2).

get_global_counters(Config) ->
get_global_counters0(Config, [{protocol, amqp10}]).
get_global_counters0(Config, #{protocol => amqp10}).

get_global_counters(Config, QType) ->
get_global_counters0(Config, [{protocol, amqp10},
{queue_type, QType}]).
get_global_counters0(Config, #{protocol => amqp10,
queue_type => QType}).

get_global_counters0(Config, Key) ->
Overview = rpc(Config, rabbit_global_counters, overview, []),
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/test/dead_lettering_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1928,7 +1928,7 @@ counted(Metric, Config) ->
metric(QueueType, Strategy, Metric, OldCounters).

metric(QueueType, Strategy, Metric, Counters) ->
Metrics = maps:get([{queue_type, QueueType}, {dead_letter_strategy, Strategy}], Counters),
Metrics = maps:get(#{queue_type => QueueType, dead_letter_strategy => Strategy}, Counters),
maps:get(Metric, Metrics).

group_name(Config) ->
Expand Down
6 changes: 3 additions & 3 deletions deps/rabbit/test/queue_type_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ smoke(Config) ->
ok = publish_and_confirm(Ch, <<"non-existent_queue">>, <<"msg4">>),
ConsumerTag3 = <<"ctag3">>,
ok = subscribe(Ch, QName, ConsumerTag3),
ProtocolCounters = maps:get([{protocol, amqp091}], get_global_counters(Config)),
ProtocolCounters = maps:get(#{protocol => amqp091}, get_global_counters(Config)),
?assertEqual(#{
messages_confirmed_total => 4,
messages_received_confirm_total => 4,
Expand All @@ -177,7 +177,7 @@ smoke(Config) ->
"rabbit_" ++
binary_to_list(?config(queue_type, Config)) ++
"_queue"),
ProtocolQueueTypeCounters = maps:get([{protocol, amqp091}, {queue_type, QueueType}],
ProtocolQueueTypeCounters = maps:get(#{protocol => amqp091, queue_type => QueueType},
get_global_counters(Config)),
?assertEqual(#{
messages_acknowledged_total => 3,
Expand All @@ -196,7 +196,7 @@ smoke(Config) ->
?assertMatch(
#{consumers := 0,
publishers := 0},
maps:get([{protocol, amqp091}], get_global_counters(Config))),
maps:get(#{protocol => amqp091}, get_global_counters(Config))),

ok.

Expand Down
3 changes: 3 additions & 0 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3002,6 +3002,9 @@ reconnect_consumer_and_wait_channel_down(Config) ->
{#'basic.deliver'{redelivered = false}, _} ->
wait_for_messages_ready(Servers, RaName, 0),
wait_for_messages_pending_ack(Servers, RaName, 1)
after 30000 ->
flush(1),
exit(basic_deliver_timeout)
end,
Up = [Leader, F2],
rabbit_ct_broker_helpers:block_traffic_between(F1, Leader),
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -991,5 +991,5 @@ counted(Metric, Config) ->
metric(Metric, OldCounters).

metric(Metric, Counters) ->
Metrics = maps:get([{queue_type, rabbit_quorum_queue}, {dead_letter_strategy, at_least_once}], Counters),
Metrics = maps:get(#{queue_type => rabbit_quorum_queue, dead_letter_strategy => at_least_once}, Counters),
maps:get(Metric, Metrics).
34 changes: 17 additions & 17 deletions deps/rabbitmq_management_agent/src/rabbit_mgmt_external_stats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -264,20 +264,8 @@ i(context_switches, State) ->
{Sw, 0} = erlang:statistics(context_switches),
{State, Sw};
i(ra_open_file_metrics, State) ->
{State, [{ra_log_wal, ra_metrics(ra_log_wal)},
{ra_log_segment_writer, ra_metrics(ra_log_segment_writer)}]}.

ra_metrics(K) ->
try
case ets:lookup(ra_open_file_metrics, whereis(K)) of
[] -> 0;
[{_, C}] -> C
end
catch
error:badarg ->
%% On startup the mgmt might start before ra does
0
end.
{State, [{ra_log_wal, 0},
{ra_log_segment_writer, 0}]}.

resource_alarm_set(Source) ->
lists:member({{resource_limit, Source, node()},[]},
Expand Down Expand Up @@ -418,7 +406,7 @@ update_state(State0) ->
get_fhc_stats() ->
dict:to_list(dict:merge(fun(_, V1, V2) -> V1 + V2 end,
dict:from_list(zero_fhc_stats()),
dict:from_list(get_ra_io_metrics()))).
dict:from_list(get_zero_ra_io_metrics()))).

zero_fhc_stats() ->
[{{Op, Counter}, 0} || Op <- [io_read, io_write],
Expand All @@ -432,5 +420,17 @@ zero_fhc_stats() ->
queue_index_write, queue_index_read],
Counter <- [count]].

get_ra_io_metrics() ->
lists:sort(ets:tab2list(ra_io_metrics)).
get_zero_ra_io_metrics() ->
%% not tracked anymore
[{{io_file_handle_open_attempt,count},0},
{{io_file_handle_open_attempt,time},0},
{{io_read,bytes},0},
{{io_read,count},0},
{{io_read,time},0},
{{io_seek,count},0},
{{io_seek,time},0},
{{io_sync,count},0},
{{io_sync,time},0},
{{io_write,bytes},0},
{{io_write,count},0},
{{io_write,time},0}].
12 changes: 6 additions & 6 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ init_global_counters() ->
lists:foreach(fun init_global_counters/1, [?MQTT_PROTO_V3,
?MQTT_PROTO_V4,
?MQTT_PROTO_V5]),
rabbit_global_counters:init([{queue_type, ?QUEUE_TYPE_QOS_0}, {dead_letter_strategy, disabled}],
rabbit_global_counters:init(#{queue_type => ?QUEUE_TYPE_QOS_0, dead_letter_strategy => disabled},
[?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER]).

init_global_counters(ProtoVer) ->
Proto = {protocol, ProtoVer},
rabbit_global_counters:init([Proto]),
rabbit_global_counters:init([Proto, {queue_type, rabbit_classic_queue}]),
rabbit_global_counters:init([Proto, {queue_type, rabbit_quorum_queue}]),
rabbit_global_counters:init([Proto, {queue_type, ?QUEUE_TYPE_QOS_0}]),
Proto = #{protocol => ProtoVer},
rabbit_global_counters:init(Proto),
rabbit_global_counters:init(Proto#{queue_type => rabbit_classic_queue}),
rabbit_global_counters:init(Proto#{queue_type => rabbit_quorum_queue}),
rabbit_global_counters:init(Proto#{queue_type => ?QUEUE_TYPE_QOS_0}),
rabbit_msg_size_metrics:init(ProtoVer).

persist_static_configuration() ->
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ global_counters(Config) ->
messages_delivered_get_manual_ack_total => 0,
messages_get_empty_total => 0,
messages_redelivered_total => 0},
get_global_counters(Config, ProtoVer, 0, [{queue_type, rabbit_classic_queue}])),
get_global_counters(Config, ProtoVer, 0, #{queue_type => rabbit_classic_queue})),
?assertEqual(#{messages_delivered_total => 1,
messages_acknowledged_total => 0,
messages_delivered_consume_auto_ack_total => 1,
Expand All @@ -711,7 +711,7 @@ global_counters(Config) ->
messages_delivered_get_manual_ack_total => 0,
messages_get_empty_total => 0,
messages_redelivered_total => 0},
get_global_counters(Config, ProtoVer, 0, [{queue_type, rabbit_mqtt_qos0_queue}])),
get_global_counters(Config, ProtoVer, 0, #{queue_type => rabbit_mqtt_qos0_queue})),

{ok, _, _} = emqtt:unsubscribe(C, Topic1),
?assertEqual(1, maps:get(consumers, get_global_counters(Config, ProtoVer))),
Expand Down
8 changes: 4 additions & 4 deletions deps/rabbitmq_mqtt/test/reader_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,11 @@ rabbit_mqtt_qos0_queue_overflow(Config) ->
QType = rabbit_mqtt_qos0_queue,

#{
[{protocol, ProtoVer}, {queue_type, QType}] :=
#{protocol => ProtoVer, queue_type => QType} :=
#{messages_delivered_total := 0,
messages_delivered_consume_auto_ack_total := 0},

[{queue_type, QType}, {dead_letter_strategy, disabled}] :=
#{queue_type => QType, dead_letter_strategy => disabled} :=
#{messages_dead_lettered_maxlen_total := NumDeadLettered}
} = rabbit_ct_broker_helpers:rpc(Config, rabbit_global_counters, overview, []),

Expand Down Expand Up @@ -314,11 +314,11 @@ rabbit_mqtt_qos0_queue_overflow(Config) ->
ExpectedNumDeadLettered = NumDeadLettered + NumDropped,
?assertMatch(
#{
[{protocol, ProtoVer}, {queue_type, QType}] :=
#{protocol => ProtoVer, queue_type => QType} :=
#{messages_delivered_total := NumReceived,
messages_delivered_consume_auto_ack_total := NumReceived},

[{queue_type, QType}, {dead_letter_strategy, disabled}] :=
#{queue_type => QType, dead_letter_strategy => disabled} :=
#{messages_dead_lettered_maxlen_total := ExpectedNumDeadLettered}
},
rabbit_ct_broker_helpers:rpc(Config, rabbit_global_counters, overview, [])),
Expand Down
20 changes: 10 additions & 10 deletions deps/rabbitmq_mqtt/test/util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,16 @@ get_global_counters(Config, ProtoVer) ->
get_global_counters(Config, ProtoVer, 0).

get_global_counters(Config, ProtoVer, Node) ->
get_global_counters(Config, ProtoVer, Node, []).

get_global_counters(Config, v3, Node, QType) ->
get_global_counters(Config, ?MQTT_PROTO_V3, Node, QType);
get_global_counters(Config, v4, Node, QType) ->
get_global_counters(Config, ?MQTT_PROTO_V4, Node, QType);
get_global_counters(Config, v5, Node, QType) ->
get_global_counters(Config, ?MQTT_PROTO_V5, Node, QType);
get_global_counters(Config, Proto, Node, QType) ->
maps:get([{protocol, Proto}] ++ QType,
get_global_counters(Config, ProtoVer, Node, #{}).

get_global_counters(Config, v3, Node, Labels) ->
get_global_counters(Config, ?MQTT_PROTO_V3, Node, Labels);
get_global_counters(Config, v4, Node, Labels) ->
get_global_counters(Config, ?MQTT_PROTO_V4, Node, Labels);
get_global_counters(Config, v5, Node, Labels) ->
get_global_counters(Config, ?MQTT_PROTO_V5, Node, Labels);
get_global_counters(Config, Proto, Node, Labels) ->
maps:get(Labels#{protocol => Proto},
rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_global_counters, overview, [])).

get_events(Node) ->
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_mqtt/test/v5_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2189,7 +2189,7 @@ dead_letter_metric(Metric, Config) ->

dead_letter_metric(Metric, Config, Strategy) ->
Counters = rpc(Config, rabbit_global_counters, overview, []),
Map = maps:get([{queue_type, rabbit_classic_queue}, {dead_letter_strategy, Strategy}], Counters),
Map = maps:get(#{queue_type => rabbit_classic_queue, dead_letter_strategy => Strategy}, Counters),
maps:get(Metric, Map).

assert_nothing_received() ->
Expand Down
Loading
Loading