Skip to content

Commit e3430aa

Browse files
ikavgomichaelklishin
authored andcommitted
RMQ-1263: Shovel: add forwarded counter
Delayed queuese can automatically create associated Shovels to transfer Ready messages to the desired destination. This adds forwarded messages counter which will be used in Management UI for better Shovel internals visibility. (cherry picked from commit a8800b6cd75d8dc42a91f88655058f2ffa3b6ea6)
1 parent 3a30917 commit e3430aa

10 files changed

+66
-28
lines changed

deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ run([Name], #{node := Node, vhost := VHost}) ->
7777
try_force_removing(Node, VHost, Name, ActingUser),
7878
{error, rabbit_data_coercion:to_binary(ErrMsg)};
7979
Match ->
80-
{{_Name, _VHost}, _Type, {_State, Opts}, _Timestamp} = Match,
80+
{{_Name, _VHost}, _Type, {_State, Opts}, _Metrics, _Timestamp} = Match,
8181
{_, HostingNode} = lists:keyfind(node, 1, Opts),
8282
case rabbit_misc:rpc_call(
8383
HostingNode, rabbit_shovel_util, delete_shovel, [VHost, Name, ActingUser]) of

deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -365,15 +365,17 @@ publish(IncomingTag, Method, Msg,
365365
ok = amqp_channel:call(OutboundChan, Method, Msg)
366366
end,
367367

368+
#{dest := Dst1} = State1 = rabbit_shovel_behaviour:incr_forwarded(State),
369+
368370
rabbit_shovel_behaviour:decr_remaining_unacked(
369371
case AckMode of
370372
no_ack ->
371-
rabbit_shovel_behaviour:decr_remaining(1, State);
373+
rabbit_shovel_behaviour:decr_remaining(1, State1);
372374
on_confirm ->
373-
State#{dest => Dst#{unacked => Unacked#{Seq => IncomingTag}}};
375+
State1#{dest => Dst1#{unacked => Unacked#{Seq => IncomingTag}}};
374376
on_publish ->
375-
State1 = rabbit_shovel_behaviour:ack(IncomingTag, false, State),
376-
rabbit_shovel_behaviour:decr_remaining(1, State1)
377+
State2 = rabbit_shovel_behaviour:ack(IncomingTag, false, State1),
378+
rabbit_shovel_behaviour:decr_remaining(1, State2)
377379
end).
378380

379381
control_throttle(State) ->

deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
status/1,
3131
% common functions
3232
decr_remaining_unacked/1,
33-
decr_remaining/2
33+
decr_remaining/2,
34+
incr_forwarded/1
3435
]).
3536

3637
-type tag() :: non_neg_integer().
@@ -155,7 +156,18 @@ nack(Tag, Multi, #{source := #{module := Mod}} = State) ->
155156
Mod:nack(Tag, Multi, State).
156157

157158
status(#{dest := #{module := Mod}} = State) ->
158-
Mod:status(State).
159+
{Mod:status(State), metrics(State)}.
160+
161+
incr_forwarded(State = #{dest := Dest}) ->
162+
State#{dest => maps:put(forwarded, maps:get(forwarded, Dest, 0) + 1, Dest)}.
163+
164+
metrics(_State = #{source := Source,
165+
dest := Dest}) ->
166+
#{remaining => maps:get(remaining, Source, unlimited),
167+
remaining_unacked => maps:get(remaining_unacked, Source, 0),
168+
pending => maps:get(pending, Dest, 0),
169+
forwarded => maps:get(forwarded, Dest, 0)}.
170+
159171

160172
%% Common functions
161173

deps/rabbitmq_shovel/src/rabbit_shovel_status.erl

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@
4949
info :: info(),
5050
blocked_status = running :: blocked_status(),
5151
blocked_at :: integer() | undefined,
52+
metrics :: #{remaining := rabbit_types:option(non_neg_integer()) | unlimited,
53+
ramaining_unacked := rabbit_types:option(non_neg_integer()),
54+
pending := rabbit_types:option(non_neg_integer()),
55+
forwarded := rabbit_types:option(non_neg_integer())
56+
},
57+
5258
timestamp :: calendar:datetime()}).
5359

5460
start_link() ->
@@ -112,6 +118,7 @@ handle_call(status, _From, State) ->
112118
{reply, [{Entry#entry.name,
113119
Entry#entry.type,
114120
blocked_status_to_info(Entry),
121+
Entry#entry.metrics,
115122
Entry#entry.timestamp}
116123
|| Entry <- Entries], State};
117124

@@ -120,6 +127,7 @@ handle_call({lookup, Name}, _From, State) ->
120127
[Entry] -> [{name, Name},
121128
{type, Entry#entry.type},
122129
{info, blocked_status_to_info(Entry)},
130+
{metrics, Entry#entry.metrics},
123131
{timestamp, Entry#entry.timestamp}];
124132
[] -> not_found
125133
end,
@@ -141,6 +149,18 @@ handle_cast({report, Name, Type, Info, Timestamp}, State) ->
141149
split_name(Name) ++ split_status(Info)),
142150
{noreply, State};
143151

152+
handle_cast({report_blocked_status, Name, {Status, Metrics}, Timestamp}, State) ->
153+
case Status of
154+
flow ->
155+
true = ets:update_element(?ETS_NAME, Name, [{#entry.blocked_status, flow},
156+
{#entry.metrics, Metrics},
157+
{#entry.blocked_at, Timestamp}]);
158+
_ ->
159+
true = ets:update_element(?ETS_NAME, Name, [{#entry.blocked_status, Status},
160+
{#entry.metrics, Metrics}])
161+
end,
162+
{noreply, State};
163+
%% used in tests
144164
handle_cast({report_blocked_status, Name, Status, Timestamp}, State) ->
145165
case Status of
146166
flow ->
@@ -178,22 +198,22 @@ code_change(_OldVsn, State, _Extra) ->
178198
inject_node_info(Node, Shovels) ->
179199
lists:map(
180200
%% starting
181-
fun({Name, Type, State, Timestamp}) when is_atom(State) ->
201+
fun({Name, Type, State, Metrics, Timestamp}) when is_atom(State) ->
182202
Opts = [{node, Node}],
183-
{Name, Type, {State, Opts}, Timestamp};
203+
{Name, Type, {State, Opts}, Metrics, Timestamp};
184204
%% terminated
185-
({Name, Type, {terminated, Reason}, Timestamp}) ->
186-
{Name, Type, {terminated, Reason}, Timestamp};
205+
({Name, Type, {terminated, Reason}, Metrics, Timestamp}) ->
206+
{Name, Type, {terminated, Reason}, Metrics, Timestamp};
187207
%% running
188-
({Name, Type, {State, Opts}, Timestamp}) ->
208+
({Name, Type, {State, Opts}, Metrics, Timestamp}) ->
189209
Opts1 = Opts ++ [{node, Node}],
190-
{Name, Type, {State, Opts1}, Timestamp}
210+
{Name, Type, {State, Opts1}, Metrics, Timestamp}
191211
end, Shovels).
192212

193213
-spec find_matching_shovel(rabbit_types:vhost(), binary(), [status_tuple()]) -> status_tuple() | undefined.
194214
find_matching_shovel(VHost, Name, Shovels) ->
195215
case lists:filter(
196-
fun ({{V, S}, _Kind, _Status, _}) ->
216+
fun ({{V, S}, _Kind, _Status, _Metrics, _}) ->
197217
VHost =:= V andalso Name =:= S
198218
end, Shovels) of
199219
[] -> undefined;

deps/rabbitmq_shovel/test/amqp10_SUITE.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ amqp10_destination(Config, AckMode) ->
139139
throw(timeout_waiting_for_deliver1)
140140
end,
141141

142-
[{test_shovel, static, {running, _Info}, _Time}] =
142+
[{test_shovel, static, {running, _Info}, _Metrics, _Time}] =
143143
rabbit_ct_broker_helpers:rpc(Config, 0,
144144
rabbit_shovel_status, status, []),
145145
amqp10_client:detach_link(Receiver),
@@ -183,7 +183,7 @@ amqp10_source(Config, AckMode) ->
183183
after ?TIMEOUT -> throw(timeout_waiting_for_deliver1)
184184
end,
185185

186-
[{test_shovel, static, {running, _Info}, _Time}] =
186+
[{test_shovel, static, {running, _Info}, _Metrics, _Time}] =
187187
rabbit_ct_broker_helpers:rpc(Config, 0,
188188
rabbit_shovel_status, status, []),
189189
rabbit_ct_client_helpers:close_channel(Chan).
@@ -267,7 +267,7 @@ setup_shovel(ShovelConfig) ->
267267
await_running_shovel(test_shovel).
268268

269269
await_running_shovel(Name) ->
270-
case [N || {N, _, {running, _}, _}
270+
case [N || {N, _, {running, _}, _, _}
271271
<- rabbit_shovel_status:status(),
272272
N =:= Name] of
273273
[_] -> ok;

deps/rabbitmq_shovel/test/configuration_SUITE.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ run_valid_test(Config) ->
277277
after ?TIMEOUT -> throw(timeout_waiting_for_deliver1)
278278
end,
279279

280-
[{test_shovel, static, {running, _Info}, _Time}] =
280+
[{test_shovel, static, {running, _Info}, _Metrics, _Time}] =
281281
rabbit_ct_broker_helpers:rpc(Config, 0,
282282
rabbit_shovel_status, status, []),
283283

@@ -407,15 +407,15 @@ setup_shovels2(Config) ->
407407
ok = application:start(rabbitmq_shovel).
408408

409409
await_running_shovel(Name) ->
410-
case [N || {N, _, {running, _}, _}
410+
case [N || {N, _, {running, _}, _Metrics, _}
411411
<- rabbit_shovel_status:status(),
412412
N =:= Name] of
413413
[_] -> ok;
414414
_ -> timer:sleep(100),
415415
await_running_shovel(Name)
416416
end.
417417
await_terminated_shovel(Name) ->
418-
case [N || {N, _, {terminated, _}, _}
418+
case [N || {N, _, {terminated, _}, _Metrics, _}
419419
<- rabbit_shovel_status:status(),
420420
N =:= Name] of
421421
[_] -> ok;

deps/rabbitmq_shovel/test/dynamic_SUITE.erl

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,17 @@ end_per_testcase(Testcase, Config) ->
118118
%% -------------------------------------------------------------------
119119

120120
simple(Config) ->
121+
Name = <<"test">>,
121122
with_ch(Config,
122123
fun (Ch) ->
123124
shovel_test_utils:set_param(
124125
Config,
125-
<<"test">>, [{<<"src-queue">>, <<"src">>},
126+
Name, [{<<"src-queue">>, <<"src">>},
126127
{<<"dest-queue">>, <<"dest">>}]),
127-
publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>)
128+
publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>),
129+
Status = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, lookup, [{<<"/">>, Name}]),
130+
?assertMatch([_|_], Status),
131+
?assertMatch(#{metrics := #{forwarded := 1}}, maps:from_list(Status))
128132
end).
129133

130134
quorum_queues(Config) ->

deps/rabbitmq_shovel/test/shovel_status_command_SUITE.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,11 @@ run_starting(Config) ->
8282
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
8383
Opts = #{node => A},
8484
case ?CMD:run([], Opts) of
85-
{stream, [{{<<"/">>, <<"test">>}, dynamic, starting, _}]} ->
85+
{stream, [{{<<"/">>, <<"test">>}, dynamic, starting, _, _}]} ->
8686
ok;
8787
{stream, []} ->
8888
throw(shovel_not_found);
89-
{stream, [{{<<"/">>, <<"test">>}, dynamic, {running, _}, _}]} ->
89+
{stream, [{{<<"/">>, <<"test">>}, dynamic, {running, _}, _, _}]} ->
9090
ct:pal("Shovel is already running, starting could not be tested!")
9191
end,
9292
shovel_test_utils:clear_param(Config, <<"test">>).
@@ -107,7 +107,7 @@ run_running(Config) ->
107107
{<<"dest-queue">>, <<"dest">>}]),
108108
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
109109
Opts = #{node => A},
110-
{stream, [{{<<"/">>, <<"test">>}, dynamic, {running, _}, _}]}
110+
{stream, [{{<<"/">>, <<"test">>}, dynamic, {running, _}, _, _}]}
111111
= ?CMD:run([], Opts),
112112
shovel_test_utils:clear_param(Config, <<"test">>).
113113

deps/rabbitmq_shovel/test/shovel_test_utils.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ shovels_from_status() ->
6565

6666
shovels_from_status(ExpectedState) ->
6767
S = rabbit_shovel_status:status(),
68-
[N || {{<<"/">>, N}, dynamic, {State, _}, _} <- S, State == ExpectedState].
68+
[N || {{<<"/">>, N}, dynamic, {State, _}, _, _} <- S, State == ExpectedState].
6969

7070
get_shovel_status(Config, Name) ->
7171
get_shovel_status(Config, 0, Name).
@@ -111,4 +111,4 @@ restart_shovel(Config, Name) ->
111111

112112
restart_shovel(Config, Node, Name) ->
113113
rabbit_ct_broker_helpers:rpc(Config,
114-
Node, rabbit_shovel_util, restart_shovel, [<<"/">>, Name]).
114+
Node, rabbit_shovel_util, restart_shovel, [<<"/">>, Name]).

deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt_util.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ status(Node) ->
4242
[format(Node, I) || I <- Status]
4343
end.
4444

45-
format(Node, {Name, Type, Info, TS}) ->
45+
format(Node, {Name, Type, Info, Metrics, TS}) ->
4646
[{node, Node}, {timestamp, format_ts(TS)}] ++
4747
format_name(Type, Name) ++
4848
format_info(Info).

0 commit comments

Comments
 (0)