Skip to content

Commit a3fa3ad

Browse files
committed
Local shovels: Add global counters
1 parent 18e4bf6 commit a3fa3ad

File tree

1 file changed

+58
-3
lines changed

1 file changed

+58
-3
lines changed

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,15 @@
1515
-include_lib("rabbit/include/mc.hrl").
1616
-include("rabbit_shovel.hrl").
1717

18+
-rabbit_boot_step({rabbit_global_local_shovel_counters,
19+
[{description, "global local shovel counters"},
20+
{mfa, {?MODULE, boot_step,
21+
[]}},
22+
{requires, rabbit_global_counters},
23+
{enables, external_infrastructure}]}).
24+
1825
-export([
26+
boot_step/0,
1927
parse/2,
2028
connect_source/1,
2129
connect_dest/1,
@@ -53,12 +61,20 @@
5361
%% See rabbit_amqp_session.erl
5462
-define(INITIAL_DELIVERY_COUNT, 16#ff_ff_ff_ff - 4).
5563
-define(DEFAULT_MAX_LINK_CREDIT, 1000).
64+
-define(PROTOCOL, 'local-shovel').
5665

5766
-record(pending_ack, {
5867
delivery_tag,
5968
msg_id
6069
}).
6170

71+
boot_step() ->
72+
Labels = #{protocol => ?PROTOCOL},
73+
rabbit_global_counters:init(Labels),
74+
rabbit_global_counters:init(Labels#{queue_type => rabbit_classic_queue}),
75+
rabbit_global_counters:init(Labels#{queue_type => rabbit_quorum_queue}),
76+
rabbit_global_counters:init(Labels#{queue_type => rabbit_stream_queue}).
77+
6278
parse(_Name, {source, Source}) ->
6379
Queue = parse_parameter(queue, fun parse_binary/1,
6480
proplists:get_value(queue, Source)),
@@ -363,6 +379,7 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} =
363379
Msg = set_annotations(Msg0, Dest),
364380
RoutedQNames = route(Msg, Dest),
365381
Queues = rabbit_amqqueue:lookup_many(RoutedQNames),
382+
messages_received(AckMode),
366383
case rabbit_queue_type:deliver(Queues, Msg, Options, QState) of
367384
{ok, QState1, Actions} ->
368385
State1 = State#{dest => Dest1#{current => Current1#{queue_states => QState1}}},
@@ -451,13 +468,15 @@ handle_queue_actions(Actions, State) ->
451468
end, State, Actions).
452469

453470
handle_deliver(AckRequired, Msgs, State) when is_list(Msgs) ->
471+
NumMsgs = length(Msgs),
454472
maybe_grant_credit(
455473
lists:foldl(
456-
fun({_QName, _QPid, MsgId, _Redelivered, Mc}, S0) ->
474+
fun({QName, _QPid, MsgId, _Redelivered, Mc}, #{source := #{current := #{queue_states := QStates }}} = S0) ->
475+
messages_delivered(QName, QStates),
457476
DeliveryTag = next_tag(S0),
458477
S = record_pending(AckRequired, DeliveryTag, MsgId, increase_next_tag(S0)),
459478
rabbit_shovel_behaviour:forward(DeliveryTag, Mc, S)
460-
end, sent_delivery(State, length(Msgs)), Msgs)).
479+
end, sent_delivery(State, NumMsgs), Msgs)).
461480

462481
next_tag(#{source := #{current := #{next_tag := DeliveryTag}}}) ->
463482
DeliveryTag.
@@ -616,6 +635,7 @@ settle(Op, DeliveryTag, Multiple,
616635
{MsgIds, UAMQ} = collect_acks(UAMQ0, DeliveryTag, Multiple),
617636
case rabbit_queue_type:settle(QRef, Op, CTag, lists:reverse(MsgIds), QState0) of
618637
{ok, QState1, Actions} ->
638+
messages_acknowledged(Op, QRef, QState1, MsgIds),
619639
State = State0#{source => Src#{current => Current#{queue_states => QState1,
620640
unacked_message_q => UAMQ}}},
621641
handle_queue_actions(Actions, State);
@@ -739,12 +759,18 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra
739759
at_least_one_credit_req_in_flight => false}}
740760
end.
741761

742-
process_routing_confirm(undefined, _, _, State) ->
762+
process_routing_confirm(undefined, _, [], State) ->
763+
rabbit_global_counters:messages_unroutable_returned(?PROTOCOL, 1),
764+
State;
765+
process_routing_confirm(undefined, _, QRefs, State) ->
766+
rabbit_global_counters:messages_routed(?PROTOCOL, length(QRefs)),
743767
State;
744768
process_routing_confirm(MsgSeqNo, Tag, [], State)
745769
when is_integer(MsgSeqNo) ->
770+
rabbit_global_counters:messages_unroutable_dropped(?PROTOCOL, 1),
746771
record_confirms([{MsgSeqNo, Tag}], State);
747772
process_routing_confirm(MsgSeqNo, Tag, QRefs, #{dest := Dst = #{unconfirmed := Unconfirmed}} = State) when is_integer(MsgSeqNo) ->
773+
rabbit_global_counters:messages_routed(?PROTOCOL, length(QRefs)),
748774
State#{dest => Dst#{unconfirmed =>
749775
rabbit_shovel_confirms:insert(MsgSeqNo, QRefs, Tag, Unconfirmed)}}.
750776

@@ -781,8 +807,10 @@ send_nacks(Rs, Cs, State) ->
781807
send_confirms([], _, State) ->
782808
State;
783809
send_confirms([MsgSeqNo], _, State) ->
810+
rabbit_global_counters:messages_confirmed(?PROTOCOL, 1),
784811
rabbit_shovel_behaviour:ack(MsgSeqNo, false, State);
785812
send_confirms(Cs, Rs, State) ->
813+
rabbit_global_counters:messages_confirmed(?PROTOCOL, length(Cs)),
786814
coalesce_and_send(Cs, Rs,
787815
fun(MsgSeqNo, Multiple, StateX) ->
788816
rabbit_shovel_behaviour:ack(MsgSeqNo, Multiple, StateX)
@@ -833,3 +861,30 @@ decr_remaining(Num, State) ->
833861
_ = send_confirms_and_nacks(State),
834862
exit(R)
835863
end.
864+
865+
messages_acknowledged(complete, QName, QS, MsgIds) ->
866+
case rabbit_queue_type:module(QName, QS) of
867+
{ok, QType} ->
868+
rabbit_global_counters:messages_acknowledged(?PROTOCOL, QType, length(MsgIds));
869+
_ ->
870+
ok
871+
end;
872+
messages_acknowledged(_, _, _, _) ->
873+
ok.
874+
875+
messages_received(AckMode) ->
876+
rabbit_global_counters:messages_received(?PROTOCOL, 1),
877+
case AckMode of
878+
on_confirm ->
879+
rabbit_global_counters:messages_received_confirm(?PROTOCOL, 1);
880+
_ ->
881+
ok
882+
end.
883+
884+
messages_delivered(QName, S0) ->
885+
case rabbit_queue_type:module(QName, S0) of
886+
{ok, QType} ->
887+
rabbit_global_counters:messages_delivered(?PROTOCOL, QType, 1);
888+
_ ->
889+
ok
890+
end.

0 commit comments

Comments
 (0)