Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ run([Name], #{node := Node, vhost := VHost}) ->
try_force_removing(Node, VHost, Name, ActingUser),
{error, rabbit_data_coercion:to_binary(ErrMsg)};
Match ->
{{_Name, _VHost}, _Type, {_State, Opts}, _Timestamp} = Match,
{{_Name, _VHost}, _Type, {_State, Opts}, _Metrics, _Timestamp} = Match,
{_, HostingNode} = lists:keyfind(node, 1, Opts),
case rabbit_misc:rpc_call(
HostingNode, rabbit_shovel_util, delete_shovel, [VHost, Name, ActingUser]) of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ run([Name], #{node := Node, vhost := VHost}) ->
undefined ->
{error, rabbit_data_coercion:to_binary(ErrMsg)};
Match ->
{{_Name, _VHost}, _Type, {_State, Opts}, _Timestamp} = Match,
{{_Name, _VHost}, _Type, {_State, Opts}, _Metrics, _Timestamp} = Match,
{_, HostingNode} = lists:keyfind(node, 1, Opts),
case rabbit_misc:rpc_call(
HostingNode, rabbit_shovel_util, restart_shovel, [VHost, Name]) of
Expand Down
10 changes: 6 additions & 4 deletions deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -365,15 +365,17 @@ publish(IncomingTag, Method, Msg,
ok = amqp_channel:call(OutboundChan, Method, Msg)
end,

#{dest := Dst1} = State1 = rabbit_shovel_behaviour:incr_forwarded(State),

rabbit_shovel_behaviour:decr_remaining_unacked(
case AckMode of
no_ack ->
rabbit_shovel_behaviour:decr_remaining(1, State);
rabbit_shovel_behaviour:decr_remaining(1, State1);
on_confirm ->
State#{dest => Dst#{unacked => Unacked#{Seq => IncomingTag}}};
State1#{dest => Dst1#{unacked => Unacked#{Seq => IncomingTag}}};
on_publish ->
State1 = rabbit_shovel_behaviour:ack(IncomingTag, false, State),
rabbit_shovel_behaviour:decr_remaining(1, State1)
State2 = rabbit_shovel_behaviour:ack(IncomingTag, false, State1),
rabbit_shovel_behaviour:decr_remaining(1, State2)
end).

control_throttle(State) ->
Expand Down
20 changes: 17 additions & 3 deletions deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
status/1,
% common functions
decr_remaining_unacked/1,
decr_remaining/2
decr_remaining/2,
incr_forwarded/1
]).

-type tag() :: non_neg_integer().
Expand Down Expand Up @@ -82,7 +83,7 @@
-callback forward(Tag :: tag(), Props :: #{atom() => any()},
Payload :: binary(), state()) ->
state() | {stop, any()}.
-callback status(state()) -> rabbit_shovel_status:blocked_status() | ignore.
-callback status(state()) -> rabbit_shovel_status:shovel_status().

-spec parse(atom(), binary(), {source | destination, proplists:proplist()}) ->
source_config() | dest_config().
Expand Down Expand Up @@ -154,8 +155,21 @@ ack(Tag, Multi, #{source := #{module := Mod}} = State) ->
nack(Tag, Multi, #{source := #{module := Mod}} = State) ->
Mod:nack(Tag, Multi, State).

-spec status(state()) -> {rabbit_shovel_status:shovel_status(), rabbit_shovel_status:metrics()}.
status(#{dest := #{module := Mod}} = State) ->
Mod:status(State).
{Mod:status(State), metrics(State)}.

incr_forwarded(State = #{dest := Dest}) ->
State#{dest => maps:put(forwarded, maps:get(forwarded, Dest, 0) + 1, Dest)}.

-spec metrics(state()) -> rabbit_shovel_status:metrics().
metrics(_State = #{source := Source,
dest := Dest}) ->
#{remaining => maps:get(remaining, Source, unlimited),
remaining_unacked => maps:get(remaining_unacked, Source, 0),
pending => maps:get(pending, Dest, 0),
forwarded => maps:get(forwarded, Dest, 0)}.


%% Common functions

Expand Down
42 changes: 32 additions & 10 deletions deps/rabbitmq_shovel/src/rabbit_shovel_status.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,27 @@
| {running, proplists:proplist()}
| {terminated, term()}.
-type blocked_status() :: running | flow | blocked.
-type shovel_status() :: blocked_status() | ignore.

-type name() :: binary() | {rabbit_types:vhost(), binary()}.
-type type() :: static | dynamic.
-type status_tuple() :: {name(), type(), info(), calendar:datetime()}.
-type metrics() :: #{remaining := rabbit_types:option(non_neg_integer()) | unlimited,
remaining_unacked := rabbit_types:option(non_neg_integer()),
pending := rabbit_types:option(non_neg_integer()),
forwarded := rabbit_types:option(non_neg_integer())
} | #{}.
-type status_tuple() :: {name(), type(), info(), metrics(), calendar:datetime()}.

-export_type([info/0, blocked_status/0]).
-export_type([info/0, blocked_status/0, shovel_status/0, metrics/0]).

-record(state, {timer}).
-record(entry, {name :: name(),
type :: type(),
info :: info(),
blocked_status = running :: blocked_status(),
blocked_at :: integer() | undefined,
metrics = #{} :: metrics(),

timestamp :: calendar:datetime()}).

start_link() ->
Expand All @@ -58,7 +66,7 @@ start_link() ->
report(Name, Type, Info) ->
gen_server:cast(?SERVER, {report, Name, Type, Info, calendar:local_time()}).

-spec report_blocked_status(name(), blocked_status()) -> ok.
-spec report_blocked_status(name(), {blocked_status(), metrics()} | blocked_status()) -> ok.
report_blocked_status(Name, Status) ->
gen_server:cast(?SERVER, {report_blocked_status, Name, Status, erlang:monotonic_time()}).

Expand Down Expand Up @@ -112,6 +120,7 @@ handle_call(status, _From, State) ->
{reply, [{Entry#entry.name,
Entry#entry.type,
blocked_status_to_info(Entry),
Entry#entry.metrics,
Entry#entry.timestamp}
|| Entry <- Entries], State};

Expand All @@ -120,6 +129,7 @@ handle_call({lookup, Name}, _From, State) ->
[Entry] -> [{name, Name},
{type, Entry#entry.type},
{info, blocked_status_to_info(Entry)},
{metrics, Entry#entry.metrics},
{timestamp, Entry#entry.timestamp}];
[] -> not_found
end,
Expand All @@ -141,6 +151,18 @@ handle_cast({report, Name, Type, Info, Timestamp}, State) ->
split_name(Name) ++ split_status(Info)),
{noreply, State};

handle_cast({report_blocked_status, Name, {Status, Metrics}, Timestamp}, State) ->
case Status of
flow ->
true = ets:update_element(?ETS_NAME, Name, [{#entry.blocked_status, flow},
{#entry.metrics, Metrics},
{#entry.blocked_at, Timestamp}]);
_ ->
true = ets:update_element(?ETS_NAME, Name, [{#entry.blocked_status, Status},
{#entry.metrics, Metrics}])
end,
{noreply, State};
%% used in tests
handle_cast({report_blocked_status, Name, Status, Timestamp}, State) ->
case Status of
flow ->
Expand Down Expand Up @@ -178,22 +200,22 @@ code_change(_OldVsn, State, _Extra) ->
inject_node_info(Node, Shovels) ->
lists:map(
%% starting
fun({Name, Type, State, Timestamp}) when is_atom(State) ->
fun({Name, Type, State, Metrics, Timestamp}) when is_atom(State) ->
Opts = [{node, Node}],
{Name, Type, {State, Opts}, Timestamp};
{Name, Type, {State, Opts}, Metrics, Timestamp};
%% terminated
({Name, Type, {terminated, Reason}, Timestamp}) ->
{Name, Type, {terminated, Reason}, Timestamp};
({Name, Type, {terminated, Reason}, Metrics, Timestamp}) ->
{Name, Type, {terminated, Reason}, Metrics, Timestamp};
%% running
({Name, Type, {State, Opts}, Timestamp}) ->
({Name, Type, {State, Opts}, Metrics, Timestamp}) ->
Opts1 = Opts ++ [{node, Node}],
{Name, Type, {State, Opts1}, Timestamp}
{Name, Type, {State, Opts1}, Metrics, Timestamp}
end, Shovels).

-spec find_matching_shovel(rabbit_types:vhost(), binary(), [status_tuple()]) -> status_tuple() | undefined.
find_matching_shovel(VHost, Name, Shovels) ->
case lists:filter(
fun ({{V, S}, _Kind, _Status, _}) ->
fun ({{V, S}, _Kind, _Status, _Metrics, _}) ->
VHost =:= V andalso Name =:= S
end, Shovels) of
[] -> undefined;
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
-record(state, {name :: binary() | {rabbit_types:vhost(), binary()},
type :: static | dynamic,
config :: rabbit_shovel_behaviour:state(),
last_reported_status = running :: rabbit_shovel_status:blocked_status()}).
last_reported_status = {running, #{}} :: {rabbit_shovel_status:blocked_status(), rabbit_shovel_status:metrics()}}).

start_link(Type, Name, Config) ->
ShovelParameter = rabbit_shovel_util:get_shovel_parameter(Name),
Expand Down Expand Up @@ -224,7 +224,7 @@ human_readable_name(Name) ->
maybe_report_blocked_status(#state{config = Config,
last_reported_status = LastStatus} = State) ->
case rabbit_shovel_behaviour:status(Config) of
ignore ->
{ignore, _} ->
State;
LastStatus ->
State;
Expand Down
6 changes: 3 additions & 3 deletions deps/rabbitmq_shovel/test/amqp10_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ amqp10_destination(Config, AckMode) ->
throw(timeout_waiting_for_deliver1)
end,

[{test_shovel, static, {running, _Info}, _Time}] =
[{test_shovel, static, {running, _Info}, _Metrics, _Time}] =
rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_shovel_status, status, []),
amqp10_client:detach_link(Receiver),
Expand Down Expand Up @@ -183,7 +183,7 @@ amqp10_source(Config, AckMode) ->
after ?TIMEOUT -> throw(timeout_waiting_for_deliver1)
end,

[{test_shovel, static, {running, _Info}, _Time}] =
[{test_shovel, static, {running, _Info}, _Metrics, _Time}] =
rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_shovel_status, status, []),
rabbit_ct_client_helpers:close_channel(Chan).
Expand Down Expand Up @@ -267,7 +267,7 @@ setup_shovel(ShovelConfig) ->
await_running_shovel(test_shovel).

await_running_shovel(Name) ->
case [N || {N, _, {running, _}, _}
case [N || {N, _, {running, _}, _, _}
<- rabbit_shovel_status:status(),
N =:= Name] of
[_] -> ok;
Expand Down
6 changes: 3 additions & 3 deletions deps/rabbitmq_shovel/test/configuration_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ run_valid_test(Config) ->
after ?TIMEOUT -> throw(timeout_waiting_for_deliver1)
end,

[{test_shovel, static, {running, _Info}, _Time}] =
[{test_shovel, static, {running, _Info}, _Metrics, _Time}] =
rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_shovel_status, status, []),

Expand Down Expand Up @@ -407,15 +407,15 @@ setup_shovels2(Config) ->
ok = application:start(rabbitmq_shovel).

await_running_shovel(Name) ->
case [N || {N, _, {running, _}, _}
case [N || {N, _, {running, _}, _Metrics, _}
<- rabbit_shovel_status:status(),
N =:= Name] of
[_] -> ok;
_ -> timer:sleep(100),
await_running_shovel(Name)
end.
await_terminated_shovel(Name) ->
case [N || {N, _, {terminated, _}, _}
case [N || {N, _, {terminated, _}, _Metrics, _}
<- rabbit_shovel_status:status(),
N =:= Name] of
[_] -> ok;
Expand Down
8 changes: 6 additions & 2 deletions deps/rabbitmq_shovel/test/dynamic_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,17 @@ end_per_testcase(Testcase, Config) ->
%% -------------------------------------------------------------------

simple(Config) ->
Name = <<"test">>,
with_ch(Config,
fun (Ch) ->
shovel_test_utils:set_param(
Config,
<<"test">>, [{<<"src-queue">>, <<"src">>},
Name, [{<<"src-queue">>, <<"src">>},
{<<"dest-queue">>, <<"dest">>}]),
publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>)
publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>),
Status = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, lookup, [{<<"/">>, Name}]),
?assertMatch([_|_], Status),
?assertMatch(#{metrics := #{forwarded := 1}}, maps:from_list(Status))
end).

quorum_queues(Config) ->
Expand Down
6 changes: 3 additions & 3 deletions deps/rabbitmq_shovel/test/shovel_status_command_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ run_starting(Config) ->
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Opts = #{node => A},
case ?CMD:run([], Opts) of
{stream, [{{<<"/">>, <<"test">>}, dynamic, starting, _}]} ->
{stream, [{{<<"/">>, <<"test">>}, dynamic, starting, _, _}]} ->
ok;
{stream, []} ->
throw(shovel_not_found);
{stream, [{{<<"/">>, <<"test">>}, dynamic, {running, _}, _}]} ->
{stream, [{{<<"/">>, <<"test">>}, dynamic, {running, _}, _, _}]} ->
ct:pal("Shovel is already running, starting could not be tested!")
end,
shovel_test_utils:clear_param(Config, <<"test">>).
Expand All @@ -107,7 +107,7 @@ run_running(Config) ->
{<<"dest-queue">>, <<"dest">>}]),
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Opts = #{node => A},
{stream, [{{<<"/">>, <<"test">>}, dynamic, {running, _}, _}]}
{stream, [{{<<"/">>, <<"test">>}, dynamic, {running, _}, _, _}]}
= ?CMD:run([], Opts),
shovel_test_utils:clear_param(Config, <<"test">>).

Expand Down
5 changes: 3 additions & 2 deletions deps/rabbitmq_shovel/test/shovel_test_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ shovels_from_status() ->

shovels_from_status(ExpectedState) ->
S = rabbit_shovel_status:status(),
[N || {{<<"/">>, N}, dynamic, {State, _}, _} <- S, State == ExpectedState].
[N || {{<<"/">>, N}, dynamic, {State, _}, _, _} <- S, State == ExpectedState] ++
[N || {{<<"/">>, N}, dynamic, {State, _}, _} <- S, State == ExpectedState].

get_shovel_status(Config, Name) ->
get_shovel_status(Config, 0, Name).
Expand Down Expand Up @@ -111,4 +112,4 @@ restart_shovel(Config, Name) ->

restart_shovel(Config, Node, Name) ->
rabbit_ct_broker_helpers:rpc(Config,
Node, rabbit_shovel_util, restart_shovel, [<<"/">>, Name]).
Node, rabbit_shovel_util, restart_shovel, [<<"/">>, Name]).
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ status(Node) ->
[format(Node, I) || I <- Status]
end.

format(Node, {Name, Type, Info, TS}) ->
format(Node, {Name, Type, Info, _Metrics, TS}) ->
[{node, Node}, {timestamp, format_ts(TS)}] ++
format_name(Type, Name) ++
format_info(Info).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ deregister_cleanup(_) -> ok.

collect_mf(_Registry, Callback) ->
Status = rabbit_shovel_status:status(500),
{StaticStatusGroups, DynamicStatusGroups} = lists:foldl(fun({_,static,{S, _}, _}, {SMap, DMap}) ->
{StaticStatusGroups, DynamicStatusGroups} = lists:foldl(fun({_,static,{S, _}, _, _}, {SMap, DMap}) ->
{maps:update_with(S, fun(C) -> C + 1 end, 1, SMap), DMap};
({_,dynamic,{S, _}, _}, {SMap, DMap}) ->
({_,dynamic,{S, _}, _, _}, {SMap, DMap}) ->
{SMap, maps:update_with(S, fun(C) -> C + 1 end, 1, DMap)}
end, {#{}, #{}}, Status),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,10 @@ await_shovel(Name, Type) ->

shovels_from_status(ExpectedState, dynamic) ->
S = rabbit_shovel_status:status(),
[N || {{<<"/">>, N}, dynamic, {State, _}, _} <- S, State == ExpectedState];
[N || {{<<"/">>, N}, dynamic, {State, _}, _, _} <- S, State == ExpectedState];
shovels_from_status(ExpectedState, static) ->
S = rabbit_shovel_status:status(),
[N || {N, static, {State, _}, _} <- S, State == ExpectedState].
[N || {N, static, {State, _}, _, _} <- S, State == ExpectedState].

get_shovel_status(Config, Name) ->
get_shovel_status(Config, 0, Name).
Expand Down