Skip to content

Commit 91a34b8

Browse files
committed
Restart global counters with our fresh understanding
This has gone through a few cycles, with @mkuratczyk & @dcorbacho covering most of the ground. @dcorbacho had most of this in #3045, but the main branch went through a few changes in the meantime. Rather than resolving all the conflicts, and then making the necessary changes, we (@gerhard + @kjnilsson) took all learnings and started re-applying a lot of the existing code from #3045. We are confident in this approach and would like to see it through. Pair @kjnilsson Signed-off-by: Gerhard Lazu <gerhard@lazu.co.uk>
1 parent e91c918 commit 91a34b8

File tree

5 files changed

+252
-32
lines changed

5 files changed

+252
-32
lines changed

deps/rabbit/Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,14 +136,15 @@ APPS_DIR := $(CURDIR)/apps
136136
LOCAL_DEPS = sasl rabbitmq_prelaunch os_mon inets compiler public_key crypto ssl syntax_tools xmerl
137137

138138
BUILD_DEPS = rabbitmq_cli
139-
DEPS = ranch rabbit_common ra sysmon_handler stdout_formatter recon observer_cli osiris amqp10_common syslog systemd
139+
DEPS = ranch rabbit_common ra sysmon_handler stdout_formatter recon observer_cli osiris amqp10_common syslog systemd seshat
140140
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck proper
141141

142142
PLT_APPS += mnesia
143143

144144
dep_syslog = git https://github.com/schlagert/syslog 4.0.0
145145
dep_osiris = git https://github.com/rabbitmq/osiris master
146146
dep_systemd = hex 0.6.1
147+
dep_seshat = git https://github.com/rabbitmq/seshat main
147148

148149
define usage_xml_to_erl
149150
$(subst __,_,$(patsubst $(DOCS_DIR)/rabbitmq%.1.xml, src/rabbit_%_usage.erl, $(subst -,_,$(1))))

deps/rabbit/src/rabbit.erl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,13 @@
127127
{requires, pre_boot},
128128
{enables, external_infrastructure}]}).
129129

130+
-rabbit_boot_step({rabbit_global_counters,
131+
[{description, "global metrics storage"},
132+
{mfa, {rabbit_global_counters, init,
133+
[]}},
134+
{requires, pre_boot},
135+
{enables, external_infrastructure}]}).
136+
130137
-rabbit_boot_step({rabbit_event,
131138
[{description, "statistics event manager"},
132139
{mfa, {rabbit_sup, start_restartable_child,

deps/rabbit/src/rabbit_channel.erl

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1285,6 +1285,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
12851285
confirm_enabled = ConfirmEnabled,
12861286
delivery_flow = Flow
12871287
}) ->
1288+
rabbit_global_counters:messages_received(all, 1),
12881289
check_msg_size(Content, MaxMessageSize, GCThreshold),
12891290
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
12901291
check_write_permitted(ExchangeName, User, AuthzContext),
@@ -1314,9 +1315,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
13141315
Username, TraceState),
13151316
DQ = {Delivery#delivery{flow = Flow}, QNames},
13161317
{noreply, case Tx of
1317-
none -> deliver_to_queues(DQ, State1);
1318-
{Msgs, Acks} -> Msgs1 = ?QUEUE:in(DQ, Msgs),
1319-
State1#ch{tx = {Msgs1, Acks}}
1318+
none ->
1319+
deliver_to_queues(DQ, State1);
1320+
{Msgs, Acks} ->
1321+
Msgs1 = ?QUEUE:in(DQ, Msgs),
1322+
State1#ch{tx = {Msgs1, Acks}}
13201323
end};
13211324
{error, Reason} ->
13221325
precondition_failed("invalid message: ~p", [Reason])
@@ -1692,9 +1695,9 @@ handle_method(#'tx.select'{}, _, State) ->
16921695
handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
16931696
precondition_failed("channel is not transactional");
16941697

1695-
handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks},
1698+
handle_method(#'tx.commit'{}, _, State = #ch{tx = {Deliveries, Acks},
16961699
limiter = Limiter}) ->
1697-
State1 = queue_fold(fun deliver_to_queues/2, State, Msgs),
1700+
State1 = queue_fold(fun deliver_to_queues/2, State, Deliveries),
16981701
Rev = fun (X) -> lists:reverse(lists:sort(X)) end,
16991702
{State2, Actions2} =
17001703
lists:foldl(fun ({ack, A}, {Acc, Actions}) ->
@@ -2128,6 +2131,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
21282131
Qs = rabbit_amqqueue:lookup(AllNames),
21292132
case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
21302133
{ok, QueueStates, Actions} ->
2134+
rabbit_global_counters:messages_routed(all, length(Qs)),
21312135
%% NB: the order here is important since basic.returns must be
21322136
%% sent before confirms.
21332137
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
@@ -2164,6 +2168,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
21642168
end,
21652169
case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
21662170
{ok, QueueStates, Actions} ->
2171+
rabbit_global_counters:messages_routed(all, length(Qs)),
21672172
%% NB: the order here is important since basic.returns must be
21682173
%% sent before confirms.
21692174
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
@@ -2780,10 +2785,10 @@ get_operation_timeout_and_deadline() ->
27802785
Deadline = now_millis() + Timeout,
27812786
{Timeout, Deadline}.
27822787

2783-
queue_fold(Fun, Init, Q) ->
2784-
case ?QUEUE:out(Q) of
2785-
{empty, _Q} -> Init;
2786-
{{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
2788+
queue_fold(Fun, Acc, Queue) ->
2789+
case ?QUEUE:out(Queue) of
2790+
{empty, _Queue} -> Acc;
2791+
{{value, Item}, Queue1} -> queue_fold(Fun, Fun(Item, Acc), Queue1)
27872792
end.
27882793

27892794
evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel,
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
6+
%%
7+
8+
-module(rabbit_global_counters).
9+
-on_load(init/0).
10+
11+
-export([
12+
init/0,
13+
overview/0,
14+
messages_received/2,
15+
messages_routed/2%,
16+
% messages_delivered_consume_ack/3,
17+
% messages_delivered_consume_autoack/3,
18+
% messages_delivered_get_ack/3,
19+
% messages_delivered_get_autoack/3,
20+
% messages_redelivered/3,
21+
% basic_get_empty/3,
22+
% messages_unroutable_dropped/3,
23+
% messages_unroutable_returned/3,
24+
% messages_confirmed/3
25+
]).
26+
27+
28+
-define(MESSAGES_RECEIVED, 1).
29+
-define(MESSAGES_ROUTED, 2).
30+
% -define(MESSAGES_DELIVERED_CONSUME_ACK, 3).
31+
% -define(MESSAGES_DELIVERED_CONSUME_AUTOACK, 4).
32+
% -define(MESSAGES_DELIVERED_GET_ACK, 5).
33+
% -define(MESSAGES_DELIVERED_GET_AUTOACK, 6).
34+
% -define(MESSAGES_REDELIVERED, 7).
35+
% -define(BASIC_GET_EMPTY, 8).
36+
% -define(MESSAGES_UNROUTABLE_DROPPED, 9).
37+
% -define(MESSAGES_UNROUTABLE_RETURNED, 10).
38+
% -define(MESSAGES_CONFIRMED, 11).
39+
40+
-define(COUNTERS,
41+
[
42+
{
43+
messages_received_total, ?MESSAGES_RECEIVED, counter,
44+
"Total number of messages received from publishers"
45+
},
46+
{
47+
messages_routed_total, ?MESSAGES_ROUTED, counter,
48+
"Total number of messages routed to queues or streams"
49+
}%,
50+
% {
51+
% messages_delivered_consume_ack_total, ?MESSAGES_DELIVERED_CONSUME_ACK, counter,
52+
% "Total number of messages consumed using basic.consume with manual acknowledgment"
53+
% },
54+
% {
55+
% messages_delivered_consume_autoack_total, ?MESSAGES_DELIVERED_CONSUME_AUTOACK, counter,
56+
% "Total number of messages consumed using basic.consume with automatic acknowledgment"
57+
% },
58+
% {
59+
% messages_delivered_get_ack_total, ?MESSAGES_DELIVERED_GET_ACK, counter,
60+
% "Total number of messages consumed using basic.get with manual acknowledgment"
61+
% },
62+
% {
63+
% messages_delivered_get_autoack_total, ?MESSAGES_DELIVERED_GET_AUTOACK, counter,
64+
% "Total number of messages consumed using basic.get with automatic acknowledgment"
65+
% },
66+
% {
67+
% messages_redelivered_total, ?MESSAGES_REDELIVERED, counter,
68+
% "Total number of messages redelivered to consumers"
69+
% },
70+
% {
71+
% basic_get_empty_total, ?BASIC_GET_EMPTY, counter,
72+
% "Total number of times basic.get operations fetched no message"
73+
% },
74+
% {
75+
% messages_unroutable_dropped_total, ?MESSAGES_UNROUTABLE_DROPPED, counter,
76+
% "Total number of messages published as non-mandatory into an exchange and dropped as unroutable"
77+
% },
78+
% {
79+
% messages_unroutable_returned_total, ?MESSAGES_UNROUTABLE_RETURNED, counter,
80+
% "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"
81+
% },
82+
% {
83+
% messages_confirmed_total, ?MESSAGES_CONFIRMED, counter,
84+
% "Total number of messages confirmed to publishers"
85+
% }
86+
]).
87+
88+
init() ->
89+
_ = seshat_counters:new_group(global),
90+
CategoryCounters = seshat_counters:new(global, all, ?COUNTERS),
91+
persistent_term:put({?MODULE, all}, CategoryCounters),
92+
ok.
93+
94+
overview() ->
95+
seshat_counters:overview(global).
96+
97+
messages_received(Category, Num) ->
98+
counters:add(fetch(Category), ?MESSAGES_RECEIVED, Num).
99+
100+
% formerly known as queue_messages_published_total
101+
messages_routed(Category, Num) ->
102+
counters:add(fetch(Category), ?MESSAGES_ROUTED, Num).
103+
104+
% messages_delivered_consume_ack(Group, Object, Num) ->
105+
% counters:add(fetch(Group, Object), ?MESSAGES_DELIVERED_CONSUME_ACK, Num).
106+
107+
% messages_delivered_consume_autoack(Group, Object, Num) ->
108+
% counters:add(fetch(Group, Object), ?MESSAGES_DELIVERED_CONSUME_AUTOACK, Num).
109+
110+
% messages_delivered_get_ack(Group, Object, Num) ->
111+
% counters:add(fetch(Group, Object), ?MESSAGES_DELIVERED_GET_ACK, Num).
112+
113+
% messages_delivered_get_autoack(Group, Object, Num) ->
114+
% counters:add(fetch(Group, Object), ?MESSAGES_DELIVERED_GET_AUTOACK, Num).
115+
116+
% % not implemented yet
117+
% messages_redelivered(Group, Object, Num) ->
118+
% counters:add(fetch(Group, Object), ?MESSAGES_REDELIVERED, Num).
119+
120+
% basic_get_empty(Group, Object, Num) ->
121+
% counters:add(fetch(Group, Object), ?BASIC_GET_EMPTY, Num).
122+
123+
% % implemented in rabbit_core_metrics (it doesn't reach a queue)
124+
% messages_unroutable_returned(Group, Object, Num) ->
125+
% counters:add(fetch(Group, Object), ?MESSAGES_UNROUTABLE_RETURNED, Num).
126+
127+
% % implemented in rabbit_core_metrics (it doesn't reach a queue)
128+
% messages_unroutable_dropped(Group, Object, Num) ->
129+
% counters:add(fetch(Group, Object), ?MESSAGES_UNROUTABLE_DROPPED, Num).
130+
131+
% messages_confirmed(Group, Object, Num) ->
132+
% counters:add(fetch(Group, Object), ?MESSAGES_CONFIRMED, Num).
133+
134+
fetch(Category) ->
135+
persistent_term:get({?MODULE, Category}).
136+
137+
% TODO
138+
% channel_messages_redelivered_total "Total number of messages redelivered to consumers"
139+
%
140+
% connection_incoming_bytes_total "Total number of bytes received on a connection"
141+
% connection_outgoing_bytes_total "Total number of bytes sent on a connection"
142+
% connection_process_reductions_total "Total number of connection process reductions"
143+
% connection_incoming_packets_total "Total number of packets received on a connection"
144+
% connection_outgoing_packets_total "Total number of packets sent on a connection"
145+
%
146+
% io_read_ops_total "Total number of I/O read operations"
147+
% io_read_bytes_total "Total number of I/O bytes read"
148+
% io_write_ops_total "Total number of I/O write operations"
149+
% io_write_bytes_total "Total number of I/O bytes written"
150+
% io_sync_ops_total "Total number of I/O sync operations"
151+
% io_seek_ops_total "Total number of I/O seek operations"
152+
% io_open_attempt_ops_total "Total number of file open attempts"
153+
% io_reopen_ops_total "Total number of times files have been reopened"
154+
%
155+
% schema_db_ram_tx_total "Total number of Schema DB memory transactions"
156+
% schema_db_disk_tx_total "Total number of Schema DB disk transactions"
157+
% msg_store_read_total "Total number of Message Store read operations"
158+
% msg_store_write_total "Total number of Message Store write operations"
159+
% queue_index_read_ops_total "Total number of Queue Index read operations"
160+
% queue_index_write_ops_total "Total number of Queue Index write operations"
161+
% queue_index_journal_write_ops_total "Total number of Queue Index Journal write operations"
162+
% io_read_time_seconds_total "Total I/O read time"
163+
% io_write_time_seconds_total "Total I/O write time"
164+
% io_sync_time_seconds_total "Total I/O sync time"
165+
% io_seek_time_seconds_total "Total I/O seek time"
166+
% io_open_attempt_time_seconds_total "Total file open attempts time"
167+
% raft_term_total "Current Raft term number"
168+
% queue_disk_reads_total "Total number of times queue read messages from disk"
169+
% queue_disk_writes_total "Total number of times queue wrote messages to disk"
170+
171+
% DONE
172+
% channel_messages_published_total "Total number of messages published into an exchange on a channel"
173+
% channel_messages_confirmed_total "Total number of messages published into an exchange and confirmed on the channel"
174+
% channel_messages_unroutable_returned_total "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"
175+
% channel_messages_unroutable_dropped_total "Total number of messages published as non-mandatory into an exchange and dropped as unroutable"
176+
% channel_get_empty_total "Total number of times basic.get operations fetched no message"
177+
% channel_get_ack_total "Total number of messages fetched with basic.get in manual acknowledgement mode"
178+
% channel_get_total "Total number of messages fetched with basic.get in automatic acknowledgement mode"
179+
% channel_messages_delivered_ack_total "Total number of messages delivered to consumers in manual acknowledgement mode"
180+
% channel_messages_delivered_total "Total number of messages delivered to consumers in automatic acknowledgement mode"
181+
% queue_messages_published_total "Total number of messages published to queues"
182+
183+
% IGNORED (IS THIS USEFUL?)
184+
% channel_process_reductions_total "Total number of channel process reductions"
185+
% queue_process_reductions_total "Total number of queue process reductions"
186+
187+
% NOT NECESSARY (DON'T GO TO ZERO)
188+
% erlang_gc_runs_total "Total number of Erlang garbage collector runs"
189+
% erlang_gc_reclaimed_bytes_total "Total number of bytes of memory reclaimed by Erlang garbage collector"
190+
% erlang_scheduler_context_switches_total "Total number of Erlang scheduler context switches"
191+
% connections_opened_total "Total number of connections opened"
192+
% connections_closed_total "Total number of connections closed or terminated"
193+
% channels_opened_total "Total number of channels opened"
194+
% channels_closed_total "Total number of channels closed"
195+
% queues_declared_total "Total number of queues declared"
196+
% queues_created_total "Total number of queues created"
197+
% queues_deleted_total "Total number of queues deleted"
198+
% auth_attempts_total "Total number of authorization attempts"
199+
% auth_attempts_succeeded_total "Total number of successful authentication attempts"
200+
% auth_attempts_failed_total "Total number of failed authentication attempts"
201+
% auth_attempts_detailed_total "Total number of authorization attempts with source info"
202+
% auth_attempts_detailed_succeeded_total "Total number of successful authorization attempts with source info"
203+
% auth_attempts_detailed_failed_total "Total number of failed authorization attempts with source info"

deps/rabbit/test/queue_type_SUITE.erl

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -122,22 +122,14 @@ smoke(Config) ->
122122
?config(queue_type, Config)}])),
123123
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
124124
amqp_channel:register_confirm_handler(Ch, self()),
125-
publish(Ch, QName, <<"msg1">>),
126-
ct:pal("waiting for confirms from ~s", [QName]),
127-
ok = receive
128-
#'basic.ack'{} -> ok;
129-
#'basic.nack'{} -> fail
130-
after 2500 ->
131-
flush(),
132-
exit(confirm_timeout)
133-
end,
125+
publish_and_confirm(Ch, QName, <<"msg1">>),
134126
DTag = basic_get(Ch, QName),
135127

136128
basic_ack(Ch, DTag),
137129
basic_get_empty(Ch, QName),
138130

139131
%% consume
140-
publish(Ch, QName, <<"msg2">>),
132+
publish_and_confirm(Ch, QName, <<"msg2">>),
141133
ConsumerTag1 = <<"ctag1">>,
142134
ok = subscribe(Ch, QName, ConsumerTag1),
143135
%% receive and ack
@@ -158,7 +150,7 @@ smoke(Config) ->
158150
%% consume and nack
159151
ConsumerTag2 = <<"ctag2">>,
160152
ok = subscribe(Ch, QName, ConsumerTag2),
161-
publish(Ch, QName, <<"msg3">>),
153+
publish_and_confirm(Ch, QName, <<"msg3">>),
162154
receive
163155
{#'basic.deliver'{delivery_tag = T,
164156
redelivered = false},
@@ -170,6 +162,14 @@ smoke(Config) ->
170162
end,
171163
%% get and ack
172164
basic_ack(Ch, basic_get(Ch, QName)),
165+
%% global counters
166+
publish_and_confirm(Ch, <<"inexistent_queue">>, <<"msg4">>),
167+
?assertMatch(#{ all := #{
168+
messages_received_total := 4,
169+
messages_routed_total := 3 }
170+
},
171+
get_global_counters(Config)
172+
),
173173
ok.
174174

175175
ack_after_queue_delete(Config) ->
@@ -181,17 +181,7 @@ ack_after_queue_delete(Config) ->
181181
?config(queue_type, Config)}])),
182182
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
183183
amqp_channel:register_confirm_handler(Ch, self()),
184-
publish(Ch, QName, <<"msg1">>),
185-
ct:pal("waiting for confirms from ~s", [QName]),
186-
ok = receive
187-
#'basic.ack'{} -> ok;
188-
#'basic.nack'{} ->
189-
ct:fail("confirm nack - expected ack")
190-
after 2500 ->
191-
flush(),
192-
exit(confirm_timeout)
193-
end,
194-
184+
publish_and_confirm(Ch, QName, <<"msg1">>),
195185
DTag = basic_get(Ch, QName),
196186

197187
ChRef = erlang:monitor(process, Ch),
@@ -229,6 +219,17 @@ publish(Ch, Queue, Msg) ->
229219
#amqp_msg{props = #'P_basic'{delivery_mode = 2},
230220
payload = Msg}).
231221

222+
publish_and_confirm(Ch, Queue, Msg) ->
223+
publish(Ch, Queue, Msg),
224+
ct:pal("waiting for ~s message confirmation from ~s", [Msg, Queue]),
225+
ok = receive
226+
#'basic.ack'{} -> ok;
227+
#'basic.nack'{} -> fail
228+
after 2500 ->
229+
flush(),
230+
exit(confirm_timeout)
231+
end.
232+
232233
basic_get(Ch, Queue) ->
233234
{GetOk, _} = Reply = amqp_channel:call(Ch, #'basic.get'{queue = Queue,
234235
no_ack = false}),
@@ -273,3 +274,6 @@ flush() ->
273274
after 0 ->
274275
ok
275276
end.
277+
278+
get_global_counters(Config) ->
279+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_global_counters, overview, []).

0 commit comments

Comments
 (0)