Skip to content

Commit c0d46e4

Browse files
committed
Report flow/blocked shovel status
Currently there is a known limitation that if the shovel is idle after it was unblocked by credit flow, then the status won't get updated until the next message.
1 parent 4d24dda commit c0d46e4

File tree

9 files changed

+99
-31
lines changed

9 files changed

+99
-31
lines changed

deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
close_dest/1,
3131
ack/3,
3232
nack/3,
33+
status/1,
3334
forward/4
3435
]).
3536

@@ -155,16 +156,16 @@ dest_endpoint(#{dest := Dest}) ->
155156
maps:to_list(maps:filter(fun(K, _) -> proplists:is_defined(K, Keys) end, Dest)).
156157

157158
forward(IncomingTag, Props, Payload, State) ->
158-
State1 = control_throttle(State),
159-
case is_blocked(State1) of
159+
case is_blocked(State) of
160160
true ->
161161
%% We are blocked by client-side flow-control and/or
162162
%% `connection.blocked` message from the destination
163163
%% broker. Simply cache the forward.
164164
PendingEntry = {IncomingTag, Props, Payload},
165-
add_pending(PendingEntry, State1);
165+
add_pending(PendingEntry, State);
166166
false ->
167-
do_forward(IncomingTag, Props, Payload, State1)
167+
State1 = do_forward(IncomingTag, Props, Payload, State),
168+
control_throttle(State1)
168169
end.
169170

170171
do_forward(IncomingTag, Props, Payload,
@@ -368,6 +369,13 @@ is_blocked(#{dest := #{blocked_by := BlockReasons}}) when BlockReasons =/= [] ->
368369
is_blocked(_) ->
369370
false.
370371

372+
status(#{dest := #{blocked_by := [flow]}}) ->
373+
flow;
374+
status(#{dest := #{blocked_by := BlockReasons}}) when BlockReasons =/= [] ->
375+
blocked;
376+
status(_) ->
377+
credit_flow:state().
378+
371379
add_pending(Elem, State = #{dest := Dest}) ->
372380
Pending = maps:get(pending, Dest, []),
373381
State#{dest => Dest#{pending => [Elem|Pending]}}.

deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
close_dest/1,
3131
ack/3,
3232
nack/3,
33+
status/1,
3334
forward/4
3435
]).
3536

@@ -302,6 +303,11 @@ nack(Tag, true, State = #{source := #{current := #{session := Session},
302303
Tag, true, accepted),
303304
State#{source => Src#{last_nacked_tag => Tag}}.
304305

306+
status(#{dest := #{current := #{link_state := attached}}}) ->
307+
flow;
308+
status(#{dest := #{current := #{link_state := credited}}}) ->
309+
running.
310+
305311
-spec forward(Tag :: tag(), Props :: #{atom() => any()},
306312
Payload :: binary(), state()) -> state().
307313
forward(_Tag, _Props, _Payload,

deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
forward/4,
2828
ack/3,
2929
nack/3,
30+
status/1,
3031
% common functions
3132
decr_remaining_unacked/1,
3233
decr_remaining/2
@@ -48,7 +49,9 @@
4849
ack_mode => ack_mode(),
4950
atom() => term()}.
5051

51-
-export_type([state/0, source_config/0, dest_config/0, uri/0, tag/0]).
52+
-type status() :: running | flow | blocked.
53+
54+
-export_type([state/0, source_config/0, dest_config/0, uri/0, tag/0, status/0]).
5255

5356
-callback parse(binary(), {source | destination, Conf :: proplists:proplist()}) ->
5457
source_config() | dest_config().
@@ -80,7 +83,7 @@
8083
-callback nack(Tag :: tag(), Multi :: boolean(), state()) -> state().
8184
-callback forward(Tag :: tag(), Props :: #{atom() => any()},
8285
Payload :: binary(), state()) -> state().
83-
86+
-callback status(state()) -> status().
8487

8588
-spec parse(atom(), binary(), {source | destination, proplists:proplist()}) ->
8689
source_config() | dest_config().
@@ -151,6 +154,9 @@ ack(Tag, Multi, #{source := #{module := Mod}} = State) ->
151154
nack(Tag, Multi, #{source := #{module := Mod}} = State) ->
152155
Mod:nack(Tag, Multi, State).
153156

157+
status(#{dest := #{module := Mod}} = State) ->
158+
Mod:status(State).
159+
154160
%% Common functions
155161

156162
%% Count down until we stop publishing in on-confirm mode

deps/rabbitmq_shovel/src/rabbit_shovel_status.erl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,15 +121,16 @@ find_matching_shovel(VHost, Name, Shovels) ->
121121
%% Implementation
122122
%%
123123

124-
split_status({running, MoreInfo}) -> [{status, running} | MoreInfo];
124+
125125
split_status({terminated, Reason}) -> [{status, terminated},
126126
{reason, Reason}];
127-
split_status(Status) when is_atom(Status) -> [{status, Status}].
127+
split_status(Status) when is_atom(Status) -> [{status, Status}];
128+
split_status({Status, [{_, _} | _] = MoreInfo}) -> [{status, Status} | MoreInfo].
128129

129130
split_name({VHost, Name}) -> [{name, Name},
130131
{vhost, VHost}];
131132
split_name(Name) when is_atom(Name) -> [{name, Name}].
132133

133134
ensure_timer(State0) ->
134135
State1 = rabbit_misc:stop_timer(State0, #state.timer),
135-
rabbit_misc:ensure_timer(State1, #state.timer, ?CHECK_FREQUENCY, check).
136+
rabbit_misc:ensure_timer(State1, #state.timer, ?CHECK_FREQUENCY, check).

deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020

2121
-record(state, {name :: binary() | {rabbit_types:vhost(), binary()},
2222
type :: static | dynamic,
23-
config :: rabbit_shovel_behaviour:state()}).
23+
config :: rabbit_shovel_behaviour:state(),
24+
last_reported_status :: rabbit_shovel_behaviour:status()}).
2425

2526
start_link(Type, Name, Config) ->
2627
ShovelParameter = rabbit_shovel_util:get_shovel_parameter(Name),
@@ -92,8 +93,8 @@ handle_cast(init_shovel, State = #state{config = Config}) ->
9293
Config2 = rabbit_shovel_behaviour:init_source(Config1),
9394
rabbit_log_shovel:debug("Shovel ~ts has finished setting up its topology", [human_readable_name(maps:get(name, Config2))]),
9495
State1 = State#state{config = Config2},
95-
ok = report_running(State1),
96-
{noreply, State1}.
96+
State2 = report_running(State1),
97+
{noreply, State2}.
9798

9899

99100
handle_info(Msg, State = #state{config = Config, name = Name}) ->
@@ -116,7 +117,9 @@ handle_info(Msg, State = #state{config = Config, name = Name}) ->
116117
rabbit_log_shovel:debug("Shovel ~ts decided to stop due a message from destination: ~tp", [human_readable_name(Name), Reason]),
117118
{stop, Reason, State};
118119
Config1 ->
119-
{noreply, State#state{config = Config1}}
120+
State1 = State#state{config = Config1},
121+
State2 = maybe_report_running(State1),
122+
{noreply, State2}
120123
end;
121124
{stop, {inbound_conn_died, heartbeat_timeout}} ->
122125
rabbit_log_shovel:error("Shovel ~ts detected missed heartbeats on source connection", [human_readable_name(Name)]),
@@ -131,7 +134,9 @@ handle_info(Msg, State = #state{config = Config, name = Name}) ->
131134
rabbit_log_shovel:error("Shovel ~ts decided to stop due a message from source: ~tp", [human_readable_name(Name), Reason]),
132135
{stop, Reason, State};
133136
Config1 ->
134-
{noreply, State#state{config = Config1}}
137+
State1 = State#state{config = Config1},
138+
State2 = maybe_report_running(State1),
139+
{noreply, State2}
135140
end.
136141

137142
terminate({shutdown, autodelete}, State = #state{name = Name,
@@ -209,20 +214,33 @@ human_readable_name(Name) ->
209214
ShovelName -> rabbit_misc:format("'~ts'", [ShovelName])
210215
end.
211216

212-
report_running(#state{config = Config} = State) ->
217+
maybe_report_running(#state{config = Config,
218+
last_reported_status = LastStatus} = State) ->
219+
case rabbit_shovel_behaviour:status(Config) of
220+
LastStatus ->
221+
State;
222+
NewStatus ->
223+
report_status(NewStatus, State)
224+
end.
225+
226+
report_running(State) ->
227+
report_status(running, State).
228+
229+
report_status(Status, #state{config = Config} = State) ->
213230
InUri = rabbit_shovel_behaviour:source_uri(Config),
214231
OutUri = rabbit_shovel_behaviour:dest_uri(Config),
215232
InProto = rabbit_shovel_behaviour:source_protocol(Config),
216233
OutProto = rabbit_shovel_behaviour:dest_protocol(Config),
217234
InEndpoint = rabbit_shovel_behaviour:source_endpoint(Config),
218235
OutEndpoint = rabbit_shovel_behaviour:dest_endpoint(Config),
219236
rabbit_shovel_status:report(State#state.name, State#state.type,
220-
{running, [{src_uri, rabbit_data_coercion:to_binary(InUri)},
237+
{Status, [{src_uri, rabbit_data_coercion:to_binary(InUri)},
221238
{src_protocol, rabbit_data_coercion:to_binary(InProto)},
222239
{dest_protocol, rabbit_data_coercion:to_binary(OutProto)},
223240
{dest_uri, rabbit_data_coercion:to_binary(OutUri)}]
224241
++ props_to_binary(InEndpoint) ++ props_to_binary(OutEndpoint)
225-
}).
242+
}),
243+
State#state{last_reported_status = Status}.
226244

227245
props_to_binary(Props) ->
228246
[{K, rabbit_data_coercion:to_binary(V)} || {K, V} <- Props].

deps/rabbitmq_shovel/test/dynamic_SUITE.erl

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,7 @@ credit_flow(Config) ->
459459
{<<"ack-mode">>, <<"on-publish">>},
460460
{<<"src-delete-after">>, <<"never">>}]),
461461
shovel_test_utils:await_shovel(Config, <<"test">>),
462+
running = shovel_test_utils:get_shovel_status(Config, <<"test">>),
462463

463464
ShovelPid = find_shovel_pid(Config),
464465
#{dest :=
@@ -482,9 +483,9 @@ credit_flow(Config) ->
482483
%% Wait until the shovel is blocked
483484
shovel_test_utils:await(
484485
fun() ->
485-
case get_shovel_state(ShovelPid) of
486-
#{dest := #{blocked_by := [flow]}} -> true;
487-
Conf -> Conf
486+
case shovel_test_utils:get_shovel_status(Config, <<"test">>) of
487+
flow -> true;
488+
Status -> Status
488489
end
489490
end,
490491
5000),
@@ -522,8 +523,22 @@ credit_flow(Config) ->
522523
#{messages := 1000} = message_count(Config, <<"dest">>),
523524
[{_, 0, _}] =
524525
rabbit_ct_broker_helpers:rpc(
525-
Config, 0, recon, proc_count, [message_queue_len, 1])
526+
Config, 0, recon, proc_count, [message_queue_len, 1]),
527+
528+
%% To avoid frequent flipping of state, there is a 1
529+
%% second "state change interval" in credit flow
530+
%% before an unblocked process changes from flow to
531+
%% running status.
532+
timer:sleep(1000),
526533

534+
%% Need to publish a message to trigger status
535+
%% reporting
536+
%% FIXME maybe add a timer in the shovel process to
537+
%% update status automatically
538+
publish(Ch, <<>>, <<"src">>, <<"hello">>),
539+
true = amqp_channel:wait_for_confirms(Ch),
540+
541+
running = shovel_test_utils:get_shovel_status(Config, <<"test">>)
527542
after
528543
resume_process(Config),
529544
set_default_credit(Config, OrigCredit)
@@ -567,9 +582,7 @@ dest_resource_alarm(AckMode, Config) ->
567582
{<<"src-delete-after">>, <<"never">>}]),
568583

569584
%% The shovel is blocked
570-
ShovelPid = find_shovel_pid(Config),
571-
Conf = get_shovel_state(ShovelPid),
572-
#{dest := #{blocked_by := [connection_blocked]}} = Conf,
585+
blocked = shovel_test_utils:get_shovel_status(Config, <<"test">>),
573586

574587
%% The shoveled message triggered a
575588
%% connection.blocked notification, but hasn't
@@ -646,7 +659,8 @@ dest_resource_alarm(AckMode, Config) ->
646659
Cnt =:= 1001
647660
end,
648661
5000),
649-
#{messages := 0} = message_count(Config, <<"src">>)
662+
#{messages := 0} = message_count(Config, <<"src">>),
663+
running = shovel_test_utils:get_shovel_status(Config, <<"test">>)
650664
after
651665
set_vm_memory_high_watermark(Config, OrigLimit)
652666
end

deps/rabbitmq_shovel/test/shovel_test_utils.erl

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99

1010
-include_lib("common_test/include/ct.hrl").
1111
-export([set_param/3, set_param_nowait/3, await_shovel/2, await_shovel1/2,
12-
shovels_from_status/0, await/1, await/2, clear_param/2]).
12+
shovels_from_status/0, get_shovel_status/2,
13+
await/1, await/2, clear_param/2]).
1314

1415
make_uri(Config) ->
1516
Hostname = ?config(rmq_hostname, Config),
@@ -37,7 +38,21 @@ await_shovel1(_Config, Name) ->
3738

3839
shovels_from_status() ->
3940
S = rabbit_shovel_status:status(),
40-
[N || {{<<"/">>, N}, dynamic, {running, _}, _} <- S].
41+
[N || {{<<"/">>, N}, dynamic, {Status, _}, _} <- S,
42+
Status == running orelse
43+
Status == flow orelse
44+
Status == blocked].
45+
46+
get_shovel_status(Config, Name) ->
47+
S = rabbit_ct_broker_helpers:rpc(
48+
Config, 0, rabbit_shovel_status, lookup, [{<<"/">>, Name}]),
49+
case S of
50+
not_found ->
51+
not_found;
52+
_ ->
53+
{Status, _} = proplists:get_value(info, S),
54+
Status
55+
end.
4156

4257
await(Pred) ->
4358
case Pred() of

deps/rabbitmq_shovel_management/priv/www/js/tmpl/shovels.ejs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
</td>
4747
<td><%= shovel.timestamp %></td>
4848
<% } else { %>
49-
<td><%= fmt_state('green', shovel.state) %></td>
49+
<td><%= fmt_object_state(shovel) %></td>
5050
<td><%= fmt_string(shovel.src_protocol) %></td>
5151
<td><%= shovel.src_uri == undefined ? fmt_string(shovel.src_uri) : fmt_string(fmt_uri_with_credentials(shovel.src_uri)) %></td>
5252
<td><%= fmt_shovel_endpoint('src_', shovel) %></td>

deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt_util.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ format_name(dynamic, {VHost, Name}) -> [{name, Name},
5252
format_info(starting) ->
5353
[{state, starting}];
5454

55-
format_info({running, Props}) ->
56-
[{state, running}] ++ Props;
55+
format_info({Status, [{_, _} | _] = Props}) ->
56+
[{state, Status}] ++ Props;
5757

5858
format_info({terminated, Reason}) ->
5959
[{state, terminated},
@@ -63,4 +63,4 @@ format_ts({{Y, M, D}, {H, Min, S}}) ->
6363
print("~w-~2.2.0w-~2.2.0w ~w:~2.2.0w:~2.2.0w", [Y, M, D, H, Min, S]).
6464

6565
print(Fmt, Val) ->
66-
list_to_binary(io_lib:format(Fmt, Val)).
66+
list_to_binary(io_lib:format(Fmt, Val)).

0 commit comments

Comments
 (0)