Skip to content

Commit 7cfd3df

Browse files
committed
Restart global counters with our fresh understanding
This has gone through a few cycles, with @mkuratczyk & @dcorbacho covering most of the ground. @dcorbacho had most of this in #3045, but the main branch went through a few changes in the meantime. Rather than resolving all the conflicts, and then making the necessary changes, we (@gerhard + @kjnilsson) took all learnings and started re-applying a lot of the existing code from #3045. We are confident in this approach and would like to see it through. Expose global metrics in rabbitmq_prometheus. We don't want to keep modifying the existing collector, but start with a new namespace, rabbitmq_global_, and continue building on top of it. The idea is to build in parallel, and slowly transition to the new metrics, because semantically the changes are too big since streams, and we have been discussing protocol-specific metrics with @kjnilsson, which makes me think that this approach is least disruptive and... simple. While at this, we removed redundant empty return value handling in the channel. The function called no longer returns this. Also removed all DONE / TODO & other comments - we'll handle them when the time comes, no need to leave TODO reminders. Pair @kjnilsson Signed-off-by: Gerhard Lazu <gerhard@lazu.co.uk>
1 parent 5e55948 commit 7cfd3df

11 files changed

+330
-50
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ RUNTIME_DEPS = [
154154
"@observer_cli//:bazel_erlang_lib",
155155
"@osiris//:bazel_erlang_lib",
156156
"@recon//:bazel_erlang_lib",
157+
"@seshat//:bazel_erlang_lib",
157158
"@sysmon_handler//:bazel_erlang_lib",
158159
"@systemd//:bazel_erlang_lib",
159160
]

deps/rabbit/Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,14 +136,15 @@ APPS_DIR := $(CURDIR)/apps
136136
LOCAL_DEPS = sasl rabbitmq_prelaunch os_mon inets compiler public_key crypto ssl syntax_tools xmerl
137137

138138
BUILD_DEPS = rabbitmq_cli
139-
DEPS = ranch rabbit_common ra sysmon_handler stdout_formatter recon observer_cli osiris amqp10_common syslog systemd
139+
DEPS = ranch rabbit_common ra sysmon_handler stdout_formatter recon observer_cli osiris amqp10_common syslog systemd seshat
140140
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck proper
141141

142142
PLT_APPS += mnesia
143143

144144
dep_syslog = git https://github.com/schlagert/syslog 4.0.0
145145
dep_osiris = git https://github.com/rabbitmq/osiris master
146146
dep_systemd = hex 0.6.1
147+
dep_seshat = git https://github.com/rabbitmq/seshat main
147148

148149
define usage_xml_to_erl
149150
$(subst __,_,$(patsubst $(DOCS_DIR)/rabbitmq%.1.xml, src/rabbit_%_usage.erl, $(subst -,_,$(1))))

deps/rabbit/src/rabbit.erl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,13 @@
127127
{requires, pre_boot},
128128
{enables, external_infrastructure}]}).
129129

130+
-rabbit_boot_step({rabbit_global_counters,
131+
[{description, "global metrics storage"},
132+
{mfa, {rabbit_global_counters, init,
133+
[]}},
134+
{requires, pre_boot},
135+
{enables, external_infrastructure}]}).
136+
130137
-rabbit_boot_step({rabbit_event,
131138
[{description, "statistics event manager"},
132139
{mfa, {rabbit_sup, start_restartable_child,

deps/rabbit/src/rabbit_channel.erl

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1285,6 +1285,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
12851285
confirm_enabled = ConfirmEnabled,
12861286
delivery_flow = Flow
12871287
}) ->
1288+
rabbit_global_counters:messages_received(all, 1),
12881289
check_msg_size(Content, MaxMessageSize, GCThreshold),
12891290
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
12901291
check_write_permitted(ExchangeName, User, AuthzContext),
@@ -1302,7 +1303,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
13021303
{MsgSeqNo, State1} =
13031304
case DoConfirm orelse Mandatory of
13041305
false -> {undefined, State};
1305-
true -> SeqNo = State#ch.publish_seqno,
1306+
true -> rabbit_global_counters:messages_received_confirm(all, 1),
1307+
SeqNo = State#ch.publish_seqno,
13061308
{SeqNo, State#ch{publish_seqno = SeqNo + 1}}
13071309
end,
13081310
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
@@ -1314,9 +1316,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
13141316
Username, TraceState),
13151317
DQ = {Delivery#delivery{flow = Flow}, QNames},
13161318
{noreply, case Tx of
1317-
none -> deliver_to_queues(DQ, State1);
1318-
{Msgs, Acks} -> Msgs1 = ?QUEUE:in(DQ, Msgs),
1319-
State1#ch{tx = {Msgs1, Acks}}
1319+
none ->
1320+
deliver_to_queues(DQ, State1);
1321+
{Msgs, Acks} ->
1322+
Msgs1 = ?QUEUE:in(DQ, Msgs),
1323+
State1#ch{tx = {Msgs1, Acks}}
13201324
end};
13211325
{error, Reason} ->
13221326
precondition_failed("invalid message: ~p", [Reason])
@@ -1363,11 +1367,9 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
13631367
handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount, Msg,
13641368
State#ch{queue_states = QueueStates});
13651369
{empty, QueueStates} ->
1370+
rabbit_global_counters:messages_get_empty(all, 1),
13661371
?INCR_STATS(queue_stats, QueueName, 1, get_empty, State),
13671372
{reply, #'basic.get_empty'{}, State#ch{queue_states = QueueStates}};
1368-
empty ->
1369-
?INCR_STATS(queue_stats, QueueName, 1, get_empty, State),
1370-
{reply, #'basic.get_empty'{}, State};
13711373
{error, {unsupported, single_active_consumer}} ->
13721374
rabbit_misc:protocol_error(
13731375
resource_locked,
@@ -1692,9 +1694,9 @@ handle_method(#'tx.select'{}, _, State) ->
16921694
handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
16931695
precondition_failed("channel is not transactional");
16941696

1695-
handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks},
1697+
handle_method(#'tx.commit'{}, _, State = #ch{tx = {Deliveries, Acks},
16961698
limiter = Limiter}) ->
1697-
State1 = queue_fold(fun deliver_to_queues/2, State, Msgs),
1699+
State1 = queue_fold(fun deliver_to_queues/2, State, Deliveries),
16981700
Rev = fun (X) -> lists:reverse(lists:sort(X)) end,
16991701
{State2, Actions2} =
17001702
lists:foldl(fun ({ack, A}, {Acc, Actions}) ->
@@ -1964,15 +1966,28 @@ record_sent(Type, Tag, AckRequired,
19641966
unacked_message_q = UAMQ,
19651967
next_tag = DeliveryTag
19661968
}) ->
1967-
?INCR_STATS(queue_stats, QName, 1, case {Type, AckRequired} of
1968-
{get, true} -> get;
1969-
{get, false} -> get_no_ack;
1970-
{deliver, true} -> deliver;
1971-
{deliver, false} -> deliver_no_ack
1972-
end, State),
1969+
rabbit_global_counters:messages_delivered(all, 1),
1970+
?INCR_STATS(queue_stats, QName, 1,
1971+
case {Type, AckRequired} of
1972+
{get, true} ->
1973+
rabbit_global_counters:messages_delivered_get_manual_ack(all, 1),
1974+
get;
1975+
{get, false} ->
1976+
rabbit_global_counters:messages_delivered_get_auto_ack(all, 1),
1977+
get_no_ack;
1978+
{deliver, true} ->
1979+
rabbit_global_counters:messages_delivered_consume_manual_ack(all, 1),
1980+
deliver;
1981+
{deliver, false} ->
1982+
rabbit_global_counters:messages_delivered_consume_auto_ack(all, 1),
1983+
deliver_no_ack
1984+
end, State),
19731985
case Redelivered of
1974-
true -> ?INCR_STATS(queue_stats, QName, 1, redeliver, State);
1975-
false -> ok
1986+
true ->
1987+
rabbit_global_counters:messages_redelivered(all, 1),
1988+
?INCR_STATS(queue_stats, QName, 1, redeliver, State);
1989+
false ->
1990+
ok
19761991
end,
19771992
DeliveredAt = os:system_time(millisecond),
19781993
rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState),
@@ -2036,6 +2051,7 @@ ack(Acked, State = #ch{queue_states = QueueStates0}) ->
20362051

20372052
incr_queue_stats(QName, MsgIds, State) ->
20382053
Count = length(MsgIds),
2054+
rabbit_global_counters:messages_acknowledged(all, Count),
20392055
?INCR_STATS(queue_stats, QName, Count, ack, State).
20402056

20412057
%% {Msgs, Acks}
@@ -2110,6 +2126,7 @@ deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
21102126
mandatory = false},
21112127
_RoutedToQueueNames = []}, State) -> %% optimisation
21122128
?INCR_STATS(exchange_stats, XName, 1, publish, State),
2129+
rabbit_global_counters:messages_unroutable_dropped(all, 1),
21132130
?INCR_STATS(exchange_stats, XName, 1, drop_unroutable, State),
21142131
State;
21152132
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{exchange_name = XName},
@@ -2128,6 +2145,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
21282145
Qs = rabbit_amqqueue:lookup(AllNames),
21292146
case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
21302147
{ok, QueueStates, Actions} ->
2148+
rabbit_global_counters:messages_routed(all, length(Qs)),
21312149
%% NB: the order here is important since basic.returns must be
21322150
%% sent before confirms.
21332151
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
@@ -2164,6 +2182,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
21642182
end,
21652183
case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
21662184
{ok, QueueStates, Actions} ->
2185+
rabbit_global_counters:messages_routed(all, length(Qs)),
21672186
%% NB: the order here is important since basic.returns must be
21682187
%% sent before confirms.
21692188
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
@@ -2213,11 +2232,13 @@ infer_extra_bcc(Qs) ->
22132232
process_routing_mandatory(_Mandatory = true,
22142233
_RoutedToQs = [],
22152234
Msg, State) ->
2235+
rabbit_global_counters:messages_unroutable_returned(all, 1),
22162236
ok = basic_return(Msg, State, no_route),
22172237
ok;
22182238
process_routing_mandatory(_Mandatory = false,
22192239
_RoutedToQs = [],
22202240
#basic_message{exchange_name = ExchangeName}, State) ->
2241+
rabbit_global_counters:messages_unroutable_dropped(all, 1),
22212242
?INCR_STATS(exchange_stats, ExchangeName, 1, drop_unroutable, State),
22222243
ok;
22232244
process_routing_mandatory(_, _, _, _) ->
@@ -2245,6 +2266,7 @@ send_confirms_and_nacks(State = #ch{tx = none, confirmed = C, rejected = R}) ->
22452266
case rabbit_node_monitor:pause_partition_guard() of
22462267
ok ->
22472268
Confirms = lists:append(C),
2269+
rabbit_global_counters:messages_confirmed(all, length(Confirms)),
22482270
Rejects = lists:append(R),
22492271
ConfirmMsgSeqNos =
22502272
lists:foldl(
@@ -2783,10 +2805,10 @@ get_operation_timeout_and_deadline() ->
27832805
Deadline = now_millis() + Timeout,
27842806
{Timeout, Deadline}.
27852807

2786-
queue_fold(Fun, Init, Q) ->
2787-
case ?QUEUE:out(Q) of
2788-
{empty, _Q} -> Init;
2789-
{{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
2808+
queue_fold(Fun, Acc, Queue) ->
2809+
case ?QUEUE:out(Queue) of
2810+
{empty, _Queue} -> Acc;
2811+
{{value, Item}, Queue1} -> queue_fold(Fun, Fun(Item, Acc), Queue1)
27902812
end.
27912813

27922814
evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel,
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
6+
%%
7+
8+
-module(rabbit_global_counters).
9+
-on_load(init/0).
10+
11+
-export([
12+
init/0,
13+
overview/0,
14+
prometheus_format/0,
15+
messages_received/2,
16+
messages_received_confirm/2,
17+
messages_unroutable_dropped/2,
18+
messages_unroutable_returned/2,
19+
messages_routed/2,
20+
messages_confirmed/2,
21+
messages_delivered/2,
22+
messages_delivered_consume_manual_ack/2,
23+
messages_delivered_consume_auto_ack/2,
24+
messages_delivered_get_manual_ack/2,
25+
messages_delivered_get_auto_ack/2,
26+
messages_get_empty/2,
27+
messages_redelivered/2,
28+
messages_acknowledged/2
29+
]).
30+
31+
-define(MESSAGES_RECEIVED, 1).
32+
-define(MESSAGES_RECEIVED_CONFIRM, 2).
33+
-define(MESSAGES_UNROUTABLE_DROPPED, 3).
34+
-define(MESSAGES_UNROUTABLE_RETURNED, 4).
35+
-define(MESSAGES_ROUTED, 5).
36+
-define(MESSAGES_CONFIRMED, 6).
37+
-define(MESSAGES_DELIVERED, 7).
38+
-define(MESSAGES_DELIVERED_CONSUME_MANUAL_ACK, 8).
39+
-define(MESSAGES_DELIVERED_CONSUME_AUTO_ACK, 9).
40+
-define(MESSAGES_DELIVERED_GET_MANUAL_ACK, 10).
41+
-define(MESSAGES_DELIVERED_GET_AUTO_ACK, 11).
42+
-define(MESSAGES_GET_EMPTY, 12).
43+
-define(MESSAGES_REDELIVERED, 13).
44+
-define(MESSAGES_ACKNOWLEDGED, 14).
45+
46+
-define(COUNTERS,
47+
[
48+
{
49+
messages_received_total, ?MESSAGES_RECEIVED, counter,
50+
"Total number of messages received from publishers"
51+
},
52+
{
53+
messages_received_confirm_total, ?MESSAGES_RECEIVED_CONFIRM, counter,
54+
"Total number of messages received from publishers expecting confirmations"
55+
},
56+
{
57+
messages_unroutable_dropped_total, ?MESSAGES_UNROUTABLE_DROPPED, counter,
58+
"Total number of messages published as non-mandatory into an exchange and dropped as unroutable"
59+
},
60+
{
61+
messages_unroutable_returned_total, ?MESSAGES_UNROUTABLE_RETURNED, counter,
62+
"Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"
63+
},
64+
{
65+
messages_routed_total, ?MESSAGES_ROUTED, counter,
66+
"Total number of messages routed to queues or streams"
67+
},
68+
{
69+
messages_confirmed_total, ?MESSAGES_CONFIRMED, counter,
70+
"Total number of messages confirmed to publishers"
71+
},
72+
{
73+
messages_delivered_total, ?MESSAGES_DELIVERED, counter,
74+
"Total number of messages delivered to consumers"
75+
},
76+
{
77+
messages_delivered_consume_manual_ack_total, ?MESSAGES_DELIVERED_CONSUME_MANUAL_ACK, counter,
78+
"Total number of messages delivered to consumers using basic.consume with manual acknowledgment"
79+
},
80+
{
81+
messages_delivered_consume_auto_ack_total, ?MESSAGES_DELIVERED_CONSUME_AUTO_ACK, counter,
82+
"Total number of messages delivered to consumers using basic.consume with automatic acknowledgment"
83+
},
84+
{
85+
messages_delivered_get_manual_ack_total, ?MESSAGES_DELIVERED_GET_MANUAL_ACK, counter,
86+
"Total number of messages delivered to consumers using basic.get with manual acknowledgment"
87+
},
88+
{
89+
messages_delivered_get_auto_ack_total, ?MESSAGES_DELIVERED_GET_AUTO_ACK, counter,
90+
"Total number of messages delivered to consumers using basic.get with automatic acknowledgment"
91+
},
92+
{
93+
messages_get_empty_total, ?MESSAGES_GET_EMPTY, counter,
94+
"Total number of times basic.get operations fetched no message"
95+
},
96+
{
97+
messages_redelivered_total, ?MESSAGES_REDELIVERED, counter,
98+
"Total number of messages redelivered to consumers"
99+
},
100+
{
101+
messages_acknowledged_total, ?MESSAGES_ACKNOWLEDGED, counter,
102+
"Total number of messages acknowledged by consumers"
103+
}
104+
]).
105+
106+
init() ->
107+
_ = seshat_counters:new_group(global),
108+
CategoryCounters = seshat_counters:new(global, all, ?COUNTERS),
109+
persistent_term:put({?MODULE, all}, CategoryCounters),
110+
ok.
111+
112+
overview() ->
113+
seshat_counters:overview(global).
114+
115+
prometheus_format() ->
116+
seshat_counters:prometheus_format(global).
117+
118+
messages_received(Category, Num) ->
119+
counters:add(fetch(Category), ?MESSAGES_RECEIVED, Num).
120+
121+
messages_received_confirm(Category, Num) ->
122+
counters:add(fetch(Category), ?MESSAGES_RECEIVED_CONFIRM, Num).
123+
124+
messages_unroutable_dropped(Category, Num) ->
125+
counters:add(fetch(Category), ?MESSAGES_UNROUTABLE_DROPPED, Num).
126+
127+
messages_unroutable_returned(Category, Num) ->
128+
counters:add(fetch(Category), ?MESSAGES_UNROUTABLE_RETURNED, Num).
129+
130+
% Formerly known as queue_messages_published_total
131+
% This is a more accurate representation of what happens with messages that
132+
% arrive on a channel or connection in the case of streams.
133+
messages_routed(Category, Num) ->
134+
counters:add(fetch(Category), ?MESSAGES_ROUTED, Num).
135+
136+
messages_confirmed(Category, Num) ->
137+
counters:add(fetch(Category), ?MESSAGES_CONFIRMED, Num).
138+
139+
messages_delivered(Category, Num) ->
140+
counters:add(fetch(Category), ?MESSAGES_DELIVERED, Num).
141+
142+
messages_delivered_consume_manual_ack(Category, Num) ->
143+
counters:add(fetch(Category), ?MESSAGES_DELIVERED_CONSUME_MANUAL_ACK, Num).
144+
145+
messages_delivered_consume_auto_ack(Category, Num) ->
146+
counters:add(fetch(Category), ?MESSAGES_DELIVERED_CONSUME_AUTO_ACK, Num).
147+
148+
messages_delivered_get_manual_ack(Category, Num) ->
149+
counters:add(fetch(Category), ?MESSAGES_DELIVERED_GET_MANUAL_ACK, Num).
150+
151+
messages_delivered_get_auto_ack(Category, Num) ->
152+
counters:add(fetch(Category), ?MESSAGES_DELIVERED_GET_AUTO_ACK, Num).
153+
154+
messages_get_empty(Category, Num) ->
155+
counters:add(fetch(Category), ?MESSAGES_GET_EMPTY, Num).
156+
157+
messages_redelivered(Category, Num) ->
158+
counters:add(fetch(Category), ?MESSAGES_REDELIVERED, Num).
159+
160+
messages_acknowledged(Category, Num) ->
161+
counters:add(fetch(Category), ?MESSAGES_ACKNOWLEDGED, Num).
162+
163+
fetch(Category) ->
164+
persistent_term:get({?MODULE, Category}).

0 commit comments

Comments
 (0)