Skip to content

Global counters per protocol + protocol AND queue_type #3127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ ALL_PLUGINS = [
"//deps/rabbitmq_stomp:bazel_erlang_lib",
"//deps/rabbitmq_stream:bazel_erlang_lib",
"//deps/rabbitmq_stream_management:bazel_erlang_lib",
"//deps/rabbitmq_stream_prometheus:bazel_erlang_lib",
"//deps/rabbitmq_top:bazel_erlang_lib",
"//deps/rabbitmq_tracing:bazel_erlang_lib",
"//deps/rabbitmq_trust_store:bazel_erlang_lib",
Expand Down
1 change: 1 addition & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ RUNTIME_DEPS = [
"@observer_cli//:bazel_erlang_lib",
"@osiris//:bazel_erlang_lib",
"@recon//:bazel_erlang_lib",
"@seshat//:bazel_erlang_lib",
"@sysmon_handler//:bazel_erlang_lib",
"@systemd//:bazel_erlang_lib",
]
Expand Down
3 changes: 2 additions & 1 deletion deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,15 @@ APPS_DIR := $(CURDIR)/apps
LOCAL_DEPS = sasl rabbitmq_prelaunch os_mon inets compiler public_key crypto ssl syntax_tools xmerl

BUILD_DEPS = rabbitmq_cli
DEPS = ranch rabbit_common ra sysmon_handler stdout_formatter recon observer_cli osiris amqp10_common syslog systemd
DEPS = ranch rabbit_common ra sysmon_handler stdout_formatter recon observer_cli osiris amqp10_common syslog systemd seshat
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck proper

PLT_APPS += mnesia

dep_syslog = git https://github.com/schlagert/syslog 4.0.0
dep_osiris = git https://github.com/rabbitmq/osiris master
dep_systemd = hex 0.6.1
dep_seshat = git https://github.com/rabbitmq/seshat main

define usage_xml_to_erl
$(subst __,_,$(patsubst $(DOCS_DIR)/rabbitmq%.1.xml, src/rabbit_%_usage.erl, $(subst -,_,$(1))))
Expand Down
7 changes: 7 additions & 0 deletions deps/rabbit/src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@
{requires, pre_boot},
{enables, external_infrastructure}]}).

-rabbit_boot_step({rabbit_global_counters,
[{description, "global counters"},
{mfa, {rabbit_global_counters, boot_step,
[]}},
{requires, pre_boot},
{enables, external_infrastructure}]}).

-rabbit_boot_step({rabbit_event,
[{description, "statistics event manager"},
{mfa, {rabbit_sup, start_restartable_child,
Expand Down
93 changes: 62 additions & 31 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1285,6 +1285,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
confirm_enabled = ConfirmEnabled,
delivery_flow = Flow
}) ->
rabbit_global_counters:messages_received(amqp091, 1),
check_msg_size(Content, MaxMessageSize, GCThreshold),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, User, AuthzContext),
Expand All @@ -1302,7 +1303,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
{MsgSeqNo, State1} =
case DoConfirm orelse Mandatory of
false -> {undefined, State};
true -> SeqNo = State#ch.publish_seqno,
true -> rabbit_global_counters:messages_received_confirm(amqp091, 1),
SeqNo = State#ch.publish_seqno,
{SeqNo, State#ch{publish_seqno = SeqNo + 1}}
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
Expand All @@ -1314,9 +1316,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Username, TraceState),
DQ = {Delivery#delivery{flow = Flow}, QNames},
{noreply, case Tx of
none -> deliver_to_queues(DQ, State1);
{Msgs, Acks} -> Msgs1 = ?QUEUE:in(DQ, Msgs),
State1#ch{tx = {Msgs1, Acks}}
none ->
deliver_to_queues(DQ, State1);
{Msgs, Acks} ->
Msgs1 = ?QUEUE:in(DQ, Msgs),
State1#ch{tx = {Msgs1, Acks}}
end};
{error, Reason} ->
precondition_failed("invalid message: ~p", [Reason])
Expand Down Expand Up @@ -1360,14 +1364,14 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
DeliveryTag, QueueStates0)
end) of
{ok, MessageCount, Msg, QueueStates} ->
{ok, QueueType} = rabbit_queue_type:module(QueueName, QueueStates),
handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount, Msg,
State#ch{queue_states = QueueStates});
QueueType, State#ch{queue_states = QueueStates});
{empty, QueueStates} ->
{ok, QueueType} = rabbit_queue_type:module(QueueName, QueueStates),
rabbit_global_counters:messages_get_empty(amqp091, QueueType, 1),
?INCR_STATS(queue_stats, QueueName, 1, get_empty, State),
{reply, #'basic.get_empty'{}, State#ch{queue_states = QueueStates}};
empty ->
?INCR_STATS(queue_stats, QueueName, 1, get_empty, State),
{reply, #'basic.get_empty'{}, State};
{error, {unsupported, single_active_consumer}} ->
rabbit_misc:protocol_error(
resource_locked,
Expand Down Expand Up @@ -1692,9 +1696,9 @@ handle_method(#'tx.select'{}, _, State) ->
handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");

handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks},
handle_method(#'tx.commit'{}, _, State = #ch{tx = {Deliveries, Acks},
limiter = Limiter}) ->
State1 = queue_fold(fun deliver_to_queues/2, State, Msgs),
State1 = queue_fold(fun deliver_to_queues/2, State, Deliveries),
Rev = fun (X) -> lists:reverse(lists:sort(X)) end,
{State2, Actions2} =
lists:foldl(fun ({ack, A}, {Acc, Actions}) ->
Expand Down Expand Up @@ -1954,7 +1958,7 @@ internal_reject(Requeue, Acked, Limiter,
ok = notify_limiter(Limiter, Acked),
{State#ch{queue_states = QueueStates}, Actions}.

record_sent(Type, Tag, AckRequired,
record_sent(Type, QueueType, Tag, AckRequired,
Msg = {QName, _QPid, MsgId, Redelivered, _Message},
State = #ch{cfg = #conf{channel = ChannelNum,
trace_state = TraceState,
Expand All @@ -1964,15 +1968,28 @@ record_sent(Type, Tag, AckRequired,
unacked_message_q = UAMQ,
next_tag = DeliveryTag
}) ->
?INCR_STATS(queue_stats, QName, 1, case {Type, AckRequired} of
{get, true} -> get;
{get, false} -> get_no_ack;
{deliver, true} -> deliver;
{deliver, false} -> deliver_no_ack
end, State),
rabbit_global_counters:messages_delivered(amqp091, QueueType, 1),
?INCR_STATS(queue_stats, QName, 1,
case {Type, AckRequired} of
{get, true} ->
rabbit_global_counters:messages_delivered_get_manual_ack(amqp091, QueueType, 1),
get;
{get, false} ->
rabbit_global_counters:messages_delivered_get_auto_ack(amqp091, QueueType, 1),
get_no_ack;
{deliver, true} ->
rabbit_global_counters:messages_delivered_consume_manual_ack(amqp091, QueueType, 1),
deliver;
{deliver, false} ->
rabbit_global_counters:messages_delivered_consume_auto_ack(amqp091, QueueType, 1),
deliver_no_ack
end, State),
case Redelivered of
true -> ?INCR_STATS(queue_stats, QName, 1, redeliver, State);
false -> ok
true ->
rabbit_global_counters:messages_redelivered(amqp091, QueueType, 1),
?INCR_STATS(queue_stats, QName, 1, redeliver, State);
false ->
ok
end,
DeliveredAt = os:system_time(millisecond),
rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState),
Expand Down Expand Up @@ -2034,8 +2051,14 @@ ack(Acked, State = #ch{queue_states = QueueStates0}) ->
ok = notify_limiter(State#ch.limiter, Acked),
{State#ch{queue_states = QueueStates}, Actions}.

incr_queue_stats(QName, MsgIds, State) ->
incr_queue_stats(QName, MsgIds, State = #ch{queue_states = QueueStates}) ->
Count = length(MsgIds),
case rabbit_queue_type:module(QName, QueueStates) of
{ok, QueueType} ->
rabbit_global_counters:messages_acknowledged(amqp091, QueueType, Count);
_ ->
noop
end,
?INCR_STATS(queue_stats, QName, Count, ack, State).

%% {Msgs, Acks}
Expand Down Expand Up @@ -2108,15 +2131,16 @@ notify_limiter(Limiter, Acked) ->
deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
confirm = false,
mandatory = false},
_RoutedToQueueNames = []}, State) -> %% optimisation
_RoutedToQueueNames = []}, State) -> %% optimisation when there are no queues
?INCR_STATS(exchange_stats, XName, 1, publish, State),
rabbit_global_counters:messages_unroutable_dropped(amqp091, 1),
?INCR_STATS(exchange_stats, XName, 1, drop_unroutable, State),
State;
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{exchange_name = XName},
mandatory = Mandatory,
confirm = Confirm,
msg_seq_no = MsgSeqNo},
_RoutedToQueueNames = [QName]}, State0 = #ch{queue_states = QueueStates0}) ->
_RoutedToQueueNames = [QName]}, State0 = #ch{queue_states = QueueStates0}) -> %% optimisation when there is one queue
AllNames = case rabbit_amqqueue:lookup(QName) of
{ok, Q0} ->
case amqqueue:get_options(Q0) of
Expand All @@ -2128,6 +2152,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
Qs = rabbit_amqqueue:lookup(AllNames),
case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
{ok, QueueStates, Actions} ->
rabbit_global_counters:messages_routed(amqp091, length(Qs)),
%% NB: the order here is important since basic.returns must be
%% sent before confirms.
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
Expand Down Expand Up @@ -2164,6 +2189,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
end,
case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
{ok, QueueStates, Actions} ->
rabbit_global_counters:messages_routed(amqp091, length(Qs)),
%% NB: the order here is important since basic.returns must be
%% sent before confirms.
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
Expand Down Expand Up @@ -2213,11 +2239,13 @@ infer_extra_bcc(Qs) ->
process_routing_mandatory(_Mandatory = true,
_RoutedToQs = [],
Msg, State) ->
rabbit_global_counters:messages_unroutable_returned(amqp091, 1),
ok = basic_return(Msg, State, no_route),
ok;
process_routing_mandatory(_Mandatory = false,
_RoutedToQs = [],
#basic_message{exchange_name = ExchangeName}, State) ->
rabbit_global_counters:messages_unroutable_dropped(amqp091, 1),
?INCR_STATS(exchange_stats, ExchangeName, 1, drop_unroutable, State),
ok;
process_routing_mandatory(_, _, _, _) ->
Expand Down Expand Up @@ -2245,6 +2273,7 @@ send_confirms_and_nacks(State = #ch{tx = none, confirmed = C, rejected = R}) ->
case rabbit_node_monitor:pause_partition_guard() of
ok ->
Confirms = lists:append(C),
rabbit_global_counters:messages_confirmed(amqp091, length(Confirms)),
Rejects = lists:append(R),
ConfirmMsgSeqNos =
lists:foldl(
Expand Down Expand Up @@ -2721,8 +2750,9 @@ handle_deliver0(ConsumerTag, AckRequired,
redelivered = Redelivered,
exchange = ExchangeName#resource.name,
routing_key = RoutingKey},
case rabbit_queue_type:module(QName, Qs) of
{ok, rabbit_classic_queue} ->
{ok, QueueType} = rabbit_queue_type:module(QName, Qs),
case QueueType of
rabbit_classic_queue ->
ok = rabbit_writer:send_command_and_notify(
WriterPid, QPid, self(), Deliver, Content);
_ ->
Expand All @@ -2732,13 +2762,14 @@ handle_deliver0(ConsumerTag, AckRequired,
undefined -> ok;
_ -> rabbit_basic:maybe_gc_large_msg(Content, GCThreshold)
end,
record_sent(deliver, ConsumerTag, AckRequired, Msg, State).
record_sent(deliver, QueueType, ConsumerTag, AckRequired, Msg, State).

handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
Msg = {_QName, _QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}}, State) ->
content = Content}},
QueueType, State) ->
ok = rabbit_writer:send_command(
WriterPid,
#'basic.get_ok'{delivery_tag = DeliveryTag,
Expand All @@ -2747,7 +2778,7 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
{noreply, record_sent(get, DeliveryTag, not(NoAck), Msg, State)}.
{noreply, record_sent(get, QueueType, DeliveryTag, not(NoAck), Msg, State)}.

init_tick_timer(State = #ch{tick_timer = undefined}) ->
{ok, Interval} = application:get_env(rabbit, channel_tick_interval),
Expand Down Expand Up @@ -2783,10 +2814,10 @@ get_operation_timeout_and_deadline() ->
Deadline = now_millis() + Timeout,
{Timeout, Deadline}.

queue_fold(Fun, Init, Q) ->
case ?QUEUE:out(Q) of
{empty, _Q} -> Init;
{{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
queue_fold(Fun, Acc, Queue) ->
case ?QUEUE:out(Queue) of
{empty, _Queue} -> Acc;
{{value, Item}, Queue1} -> queue_fold(Fun, Fun(Item, Acc), Queue1)
end.

evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel,
Expand Down
Loading