Skip to content

Global counters #3045

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ APPS_DIR := $(CURDIR)/apps
LOCAL_DEPS = sasl rabbitmq_prelaunch os_mon inets compiler public_key crypto ssl syntax_tools xmerl

BUILD_DEPS = rabbitmq_cli
DEPS = ranch rabbit_common ra sysmon_handler stdout_formatter recon observer_cli osiris amqp10_common syslog systemd
DEPS = ranch rabbit_common ra sysmon_handler stdout_formatter recon observer_cli osiris amqp10_common syslog systemd seshat
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck proper

PLT_APPS += mnesia
Expand All @@ -150,6 +150,7 @@ dep_syslog = git https://github.com/schlagert/syslog 4.0.0
dep_osiris = git https://github.com/rabbitmq/osiris master
# TODO: Use systemd from Hex.pm, once there is a new post-0.6.0 release.
dep_systemd = git https://github.com/hauleth/erlang-systemd e732727b0b637eb29e8adc77a4eb46d7ebc0f41a
dep_seshat = git https://github.com/rabbitmq/seshat main

define usage_xml_to_erl
$(subst __,_,$(patsubst $(DOCS_DIR)/rabbitmq%.1.xml, src/rabbit_%_usage.erl, $(subst -,_,$(1))))
Expand Down
7 changes: 7 additions & 0 deletions deps/rabbit/src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@
{requires, pre_boot},
{enables, external_infrastructure}]}).

-rabbit_boot_step({rabbit_messages_counters,
[{description, "messages metrics storage"},
{mfa, {rabbit_messages_counters, init,
[]}},
{requires, pre_boot},
{enables, external_infrastructure}]}).

%% -rabbit_boot_step({rabbit_stream_coordinator,
%% [{description, "stream queues coordinator"},
%% {mfa, {rabbit_stream_coordinator, start,
Expand Down
7 changes: 0 additions & 7 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2105,13 +2105,6 @@ notify_limiter(Limiter, Acked) ->
end
end.

deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
confirm = false,
mandatory = false},
_RoutedToQs = []}, State) -> %% optimisation
?INCR_STATS(exchange_stats, XName, 1, publish, State),
?INCR_STATS(exchange_stats, XName, 1, drop_unroutable, State),
State;
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
mandatory = Mandatory,
Expand Down
207 changes: 207 additions & 0 deletions deps/rabbit/src/rabbit_messages_counters.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
%%

-module(rabbit_messages_counters).
-on_load(init/0).

-export([
init/0,
new/3,
messages_published/3,
messages_routed/3,
messages_delivered_consume_ack/3,
messages_delivered_consume_autoack/3,
messages_delivered_get_ack/3,
messages_delivered_get_autoack/3,
messages_redelivered/3,
basic_get_empty/3,
messages_unroutable_dropped/3,
messages_unroutable_returned/3,
messages_confirmed/3
]).


-define(MESSAGES_PUBLISHED, 1).
-define(MESSAGES_ROUTED, 2).
-define(MESSAGES_DELIVERED_CONSUME_ACK, 3).
-define(MESSAGES_DELIVERED_CONSUME_AUTOACK, 4).
-define(MESSAGES_DELIVERED_GET_ACK, 5).
-define(MESSAGES_DELIVERED_GET_AUTOACK, 6).
-define(MESSAGES_REDELIVERED, 7).
-define(BASIC_GET_EMPTY, 8).
-define(MESSAGES_UNROUTABLE_DROPPED, 9).
-define(MESSAGES_UNROUTABLE_RETURNED, 10).
-define(MESSAGES_CONFIRMED, 11).

-define(COUNTERS,
[
{
messages_published_total, ?MESSAGES_PUBLISHED, counter,
"Total number of messages published to queues and streams"
},
{
messages_routed_total, ?MESSAGES_ROUTED, counter,
"Total number of messages routed to queues"
},
{
messages_delivered_consume_ack_total, ?MESSAGES_DELIVERED_CONSUME_ACK, counter,
"Total number of messages consumed using basic.consume with manual acknowledgment"
},
{
messages_delivered_consume_autoack_total, ?MESSAGES_DELIVERED_CONSUME_AUTOACK, counter,
"Total number of messages consumed using basic.consume with automatic acknowledgment"
},
{
messages_delivered_get_ack_total, ?MESSAGES_DELIVERED_GET_ACK, counter,
"Total number of messages consumed using basic.get with manual acknowledgment"
},
{
messages_delivered_get_autoack_total, ?MESSAGES_DELIVERED_GET_AUTOACK, counter,
"Total number of messages consumed using basic.get with automatic acknowledgment"
},
{
messages_redelivered_total, ?MESSAGES_REDELIVERED, counter,
"Total number of messages redelivered to consumers"
},
{
basic_get_empty_total, ?BASIC_GET_EMPTY, counter,
"Total number of times basic.get operations fetched no message"
},
{
messages_unroutable_dropped_total, ?MESSAGES_UNROUTABLE_DROPPED, counter,
"Total number of messages published as non-mandatory into an exchange and dropped as unroutable"
},
{
messages_unroutable_returned_total, ?MESSAGES_UNROUTABLE_RETURNED, counter,
"Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"
},
{
messages_confirmed_total, ?MESSAGES_CONFIRMED, counter,
"Total number of messages confirmed to publishers"
}
]).

init() ->
seshat_counters:new_group(queue),
seshat_counters:new_group(global),
persistent_term:put({?MODULE, global}, new(global, global, [])),
ok.

new(Group, Object, Fields) ->
%% Some object could have extra metrics, i.e. queue types might have their own counters
seshat_counters:new(Group, Object, ?COUNTERS ++ Fields).

% TODO - these are received by queues, not from clients (doesn't account for unroutable)
messages_published(Group, Object, Num) ->
counters:add(fetch(Group, Object), ?MESSAGES_PUBLISHED, Num).

% formerly known as queue_messages_published_total
messages_routed(Group, Object, Num) ->
counters:add(fetch(Group, Object), ?MESSAGES_ROUTED, Num).

messages_delivered_consume_ack(Group, Object, Num) ->
counters:add(fetch(Group, Object), ?MESSAGES_DELIVERED_CONSUME_ACK, Num).

messages_delivered_consume_autoack(Group, Object, Num) ->
counters:add(fetch(Group, Object), ?MESSAGES_DELIVERED_CONSUME_AUTOACK, Num).

messages_delivered_get_ack(Group, Object, Num) ->
counters:add(fetch(Group, Object), ?MESSAGES_DELIVERED_GET_ACK, Num).

messages_delivered_get_autoack(Group, Object, Num) ->
counters:add(fetch(Group, Object), ?MESSAGES_DELIVERED_GET_AUTOACK, Num).

% not implemented yet
messages_redelivered(Group, Object, Num) ->
counters:add(fetch(Group, Object), ?MESSAGES_REDELIVERED, Num).

basic_get_empty(Group, Object, Num) ->
counters:add(fetch(Group, Object), ?BASIC_GET_EMPTY, Num).

% implemented in rabbit_core_metrics (it doesn't reach a queue)
messages_unroutable_returned(Group, Object, Num) ->
counters:add(fetch(Group, Object), ?MESSAGES_UNROUTABLE_RETURNED, Num).

% implemented in rabbit_core_metrics (it doesn't reach a queue)
messages_unroutable_dropped(Group, Object, Num) ->
counters:add(fetch(Group, Object), ?MESSAGES_UNROUTABLE_DROPPED, Num).

messages_confirmed(Group, Object, Num) ->
counters:add(fetch(Group, Object), ?MESSAGES_CONFIRMED, Num).

fetch(global, global) ->
persistent_term:get({?MODULE, global});
fetch(Group, Object) ->
seshat_counters:fetch(Group, Object).

% TODO
% channel_messages_redelivered_total "Total number of messages redelivered to consumers"
%
% connection_incoming_bytes_total "Total number of bytes received on a connection"
% connection_outgoing_bytes_total "Total number of bytes sent on a connection"
% connection_process_reductions_total "Total number of connection process reductions"
% connection_incoming_packets_total "Total number of packets received on a connection"
% connection_outgoing_packets_total "Total number of packets sent on a connection"
%
% io_read_ops_total "Total number of I/O read operations"
% io_read_bytes_total "Total number of I/O bytes read"
% io_write_ops_total "Total number of I/O write operations"
% io_write_bytes_total "Total number of I/O bytes written"
% io_sync_ops_total "Total number of I/O sync operations"
% io_seek_ops_total "Total number of I/O seek operations"
% io_open_attempt_ops_total "Total number of file open attempts"
% io_reopen_ops_total "Total number of times files have been reopened"
%
% schema_db_ram_tx_total "Total number of Schema DB memory transactions"
% schema_db_disk_tx_total "Total number of Schema DB disk transactions"
% msg_store_read_total "Total number of Message Store read operations"
% msg_store_write_total "Total number of Message Store write operations"
% queue_index_read_ops_total "Total number of Queue Index read operations"
% queue_index_write_ops_total "Total number of Queue Index write operations"
% queue_index_journal_write_ops_total "Total number of Queue Index Journal write operations"
% io_read_time_seconds_total "Total I/O read time"
% io_write_time_seconds_total "Total I/O write time"
% io_sync_time_seconds_total "Total I/O sync time"
% io_seek_time_seconds_total "Total I/O seek time"
% io_open_attempt_time_seconds_total "Total file open attempts time"
% raft_term_total "Current Raft term number"
% queue_disk_reads_total "Total number of times queue read messages from disk"
% queue_disk_writes_total "Total number of times queue wrote messages to disk"

% DONE
% channel_messages_published_total "Total number of messages published into an exchange on a channel"
% channel_messages_confirmed_total "Total number of messages published into an exchange and confirmed on the channel"
% channel_messages_unroutable_returned_total "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"
% channel_messages_unroutable_dropped_total "Total number of messages published as non-mandatory into an exchange and dropped as unroutable"
% channel_get_empty_total "Total number of times basic.get operations fetched no message"
% channel_get_ack_total "Total number of messages fetched with basic.get in manual acknowledgement mode"
% channel_get_total "Total number of messages fetched with basic.get in automatic acknowledgement mode"
% channel_messages_delivered_ack_total "Total number of messages delivered to consumers in manual acknowledgement mode"
% channel_messages_delivered_total "Total number of messages delivered to consumers in automatic acknowledgement mode"
% queue_messages_published_total "Total number of messages published to queues"

% IGNORED (IS THIS USEFUL?)
% channel_process_reductions_total "Total number of channel process reductions"
% queue_process_reductions_total "Total number of queue process reductions"

% NOT NECESSARY (DON'T GO TO ZERO)
% erlang_gc_runs_total "Total number of Erlang garbage collector runs"
% erlang_gc_reclaimed_bytes_total "Total number of bytes of memory reclaimed by Erlang garbage collector"
% erlang_scheduler_context_switches_total "Total number of Erlang scheduler context switches"
% connections_opened_total "Total number of connections opened"
% connections_closed_total "Total number of connections closed or terminated"
% channels_opened_total "Total number of channels opened"
% channels_closed_total "Total number of channels closed"
% queues_declared_total "Total number of queues declared"
% queues_created_total "Total number of queues created"
% queues_deleted_total "Total number of queues deleted"
% auth_attempts_total "Total number of authorization attempts"
% auth_attempts_succeeded_total "Total number of successful authentication attempts"
% auth_attempts_failed_total "Total number of failed authentication attempts"
% auth_attempts_detailed_total "Total number of authorization attempts with source info"
% auth_attempts_detailed_succeeded_total "Total number of successful authorization attempts with source info"
% auth_attempts_detailed_failed_total "Total number of failed authorization attempts with source info"
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_osiris_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ handle_cast(_Request, State) ->
{noreply, State}.

handle_info(tick, #state{timeout = Timeout} = State) ->
Data = osiris_counters:overview(),
Data = seshat_counters:overview(osiris),
maps:map(
fun ({osiris_writer, QName}, #{offset := Offs,
first_offset := FstOffs}) ->
Expand Down
23 changes: 19 additions & 4 deletions deps/rabbit/src/rabbit_queue_type.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-module(rabbit_queue_type).
-include("amqqueue.hrl").
-include_lib("rabbit_common/include/resource.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").

-export([
init/0,
Expand Down Expand Up @@ -448,13 +448,22 @@ deliver(Qs, Delivery, State) ->
{error, Reason}
end.

deliver0([], #delivery{mandatory = false} = Delivery, State0) ->
rabbit_messages_counters:messages_unroutable_dropped(global, global, 1),
{ok, State0, []};
deliver0([], #delivery{mandatory = true} = Delivery, State0) ->
rabbit_messages_counters:messages_unroutable_returned(global, global, 1),
{ok, State0, []};
deliver0(Qs, Delivery, stateless) ->
rabbit_messages_counters:messages_routed(global, global, 1),
_ = lists:map(fun(Q) ->
Mod = amqqueue:get_type(Q),
_ = Mod:deliver([{Q, stateless}], Delivery)
end, Qs),
rabbit_messages_counters:messages_published(global, global, length(Qs)),
{ok, stateless, []};
deliver0(Qs, Delivery, #?STATE{} = State0) ->
rabbit_messages_counters:messages_routed(global, global, 1),
%% TODO: optimise single queue case?
%% sort by queue type - then dispatch each group
ByType = lists:foldl(
Expand All @@ -466,9 +475,10 @@ deliver0(Qs, Delivery, #?STATE{} = State0) ->
[{Q, Ctx#ctx.state} | A]
end, [{Q, Ctx#ctx.state}], Acc)
end, #{}, Qs),
%%% dispatch each group to queue type interface?
{Xs, Actions} = maps:fold(fun(Mod, QSs, {X0, A0}) ->
{X, A} = Mod:deliver(QSs, Delivery),
%% dispatch each group to queue type interface
{Xs, Actions} = maps:fold(fun(Mod, QTSs, {X0, A0}) ->
{X, A} = Mod:deliver(QTSs, Delivery),
rabbit_messages_counters:messages_published(global, global, length(Qs)),
{X0 ++ X, A0 ++ A}
end, {[], []}, ByType),
State = lists:foldl(
Expand Down Expand Up @@ -518,8 +528,13 @@ dequeue(Q, NoAck, LimiterPid, CTag, Ctxs) ->
Mod = amqqueue:get_type(Q),
case Mod:dequeue(NoAck, LimiterPid, CTag, State0) of
{ok, Num, Msg, State} ->
case NoAck of
false -> rabbit_messages_counters:messages_delivered_get_ack(global, global, 1);
true -> rabbit_messages_counters:messages_delivered_get_autoack(global, global, 1)
end,
{ok, Num, Msg, set_ctx(Q, Ctx#ctx{state = State}, Ctxs)};
{empty, State} ->
rabbit_messages_counters:basic_get_empty(global, global, 1),
{empty, set_ctx(Q, Ctx#ctx{state = State}, Ctxs)};
{error, _} = Err ->
Err;
Expand Down
7 changes: 4 additions & 3 deletions deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ create_stream(Q0, Node) ->
{arguments, Arguments},
{user_who_performed_action,
ActingUser}]),
rabbit_messages_counters:new(queue, QName, []),
{new, Q};
Error ->

Expand Down Expand Up @@ -499,7 +500,7 @@ i(committed_offset, Q) ->
%% TODO should it be on a metrics table?
%% The queue could be removed between the list() and this call
%% to retrieve the overview. Let's default to '' if it's gone.
Data = osiris_counters:overview(),
Data = seshat_counters:overview(osiris),
maps:get(committed_offset,
maps:get({osiris_writer, amqqueue:get_name(Q)}, Data, #{}), '');
i(policy, Q) ->
Expand Down Expand Up @@ -568,7 +569,7 @@ get_counters(Q) ->
lists:filter(fun (X) -> X =/= undefined end, Counters).

safe_get_overview(Node) ->
case rpc:call(Node, osiris_counters, overview, []) of
case rpc:call(Node, seshat_counters, overview, [osiris]) of
{badrpc, _} ->
#{node => Node};
Data ->
Expand Down Expand Up @@ -613,7 +614,7 @@ tracking_status(Vhost, QueueName) ->

readers(QName) ->
try
Data = osiris_counters:overview(),
Data = seshat_counters:overview(osiris),
Readers = case maps:get({osiris_writer, QName}, Data, not_found) of
not_found ->
maps:get(readers, maps:get({osiris_replica, QName}, Data, #{}), 0);
Expand Down
Loading