Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 57 additions & 5 deletions deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,23 @@ dest_endpoint(#{dest := Dest}) ->
Keys = [dest_exchange, dest_exchange_key, dest_queue],
maps:to_list(maps:filter(fun(K, _) -> proplists:is_defined(K, Keys) end, Dest)).

forward(IncomingTag, Props, Payload,
State0 = #{dest := #{props_fun := PropsFun,
current := {_, _, DstUri},
fields_fun := FieldsFun}}) ->
forward(IncomingTag, Props, Payload, State) ->
State1 = control_throttle(State),
case is_blocked(State1) of
true ->
%% We are blocked by client-side flow-control and/or
%% `connection.blocked` message from the destination
%% broker. Simply cache the forward.
PendingEntry = {IncomingTag, Props, Payload},
add_pending(PendingEntry, State1);
false ->
do_forward(IncomingTag, Props, Payload, State1)
end.

do_forward(IncomingTag, Props, Payload,
State0 = #{dest := #{props_fun := PropsFun,
current := {_, _, DstUri},
fields_fun := FieldsFun}}) ->
SrcUri = rabbit_shovel_behaviour:source_uri(State0),
% do publish
Exchange = maps:get(exchange, Props, undefined),
Expand Down Expand Up @@ -258,6 +271,14 @@ handle_dest({'EXIT', Conn, Reason}, #{dest := #{current := {Conn, _, _}}}) ->
handle_dest({'EXIT', _Pid, {shutdown, {server_initiated_close, ?PRECONDITION_FAILED, Reason}}}, _State) ->
{stop, {outbound_link_or_channel_closure, Reason}};

handle_dest({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
{Pending, State1} = reset_pending(control_throttle(State)),
%% we have credit so can begin to forward
lists:foldl(fun ({Tag, Props, Payload}, S) ->
forward(Tag, Props, Payload, S)
end, State1, lists:reverse(Pending));

handle_dest(_Msg, _State) ->
not_handled.

Expand Down Expand Up @@ -301,7 +322,13 @@ publish(IncomingTag, Method, Msg,
amqp_channel:next_publish_seqno(OutboundChan);
_ -> undefined
end,
ok = amqp_channel:call(OutboundChan, Method, Msg),
case AckMode of
on_publish ->
ok = amqp_channel:cast_flow(OutboundChan, Method, Msg);
_ ->
ok = amqp_channel:call(OutboundChan, Method, Msg)
end,

rabbit_shovel_behaviour:decr_remaining_unacked(
case AckMode of
no_ack ->
Expand All @@ -313,6 +340,31 @@ publish(IncomingTag, Method, Msg,
rabbit_shovel_behaviour:decr_remaining(1, State1)
end).

control_throttle(State) ->
update_blocked_by(flow, credit_flow:blocked(), State).

update_blocked_by(Tag, IsBlocked, State = #{dest := Dest}) ->
BlockReasons = maps:get(blocked_by, Dest, []),
NewBlockReasons =
case IsBlocked of
true -> ordsets:add_element(Tag, BlockReasons);
false -> ordsets:del_element(Tag, BlockReasons)
end,
State#{dest => Dest#{blocked_by => NewBlockReasons}}.

is_blocked(#{dest := #{blocked_by := BlockReasons}}) when BlockReasons =/= [] ->
true;
is_blocked(_) ->
false.

add_pending(Elem, State = #{dest := Dest}) ->
Pending = maps:get(pending, Dest, []),
State#{dest => Dest#{pending => [Elem|Pending]}}.

reset_pending(State = #{dest := Dest}) ->
Pending = maps:get(pending, Dest, []),
{Pending, State#{dest => Dest#{pending => []}}}.

make_conn_and_chan([], {VHost, Name} = _ShovelName) ->
rabbit_log:error(
"Shovel '~s' in vhost '~s' has no more URIs to try for connection",
Expand Down
82 changes: 81 additions & 1 deletion deps/rabbitmq_shovel/test/dynamic_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ groups() ->
autodelete,
validation,
security_validation,
get_connection_name
get_connection_name,
credit_flow
]},

{quorum_queue_tests, [], [
Expand Down Expand Up @@ -439,6 +440,67 @@ get_connection_name(_Config) ->
<<"Shovel">> = rabbit_shovel_worker:get_connection_name({one, two, three}),
<<"Shovel">> = rabbit_shovel_worker:get_connection_name(<<"anything else">>).

credit_flow(Config) ->
OrigCredit = set_default_credit(Config, {20, 10}),

with_ch(Config,
fun (Ch) ->
amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:call(Ch, #'queue.declare'{queue = <<"src">>}),
%% Send larger payloads to fill up the socket buffers quicker
Payload = binary:copy(<<"hello">>, 1000),
publish_count(Ch, <<>>, <<"src">>, Payload, 1000),
amqp_channel:wait_for_confirms(Ch),

OrigLimit = set_vm_memory_high_watermark(Config, 0.00000001),
%% Let connection block.
timer:sleep(100),

try
shovel_test_utils:set_param_nowait(
Config,
<<"test">>, [{<<"src-queue">>, <<"src">>},
{<<"dest-queue">>, <<"dest">>},
{<<"src-prefetch-count">>, 50},
{<<"ack-mode">>, <<"on-publish">>},
{<<"src-delete-after">>, <<"never">>}]),
shovel_test_utils:await_shovel(Config, <<"test">>),

%% There should be only one process with a message buildup
[{WriterPid, MQLen, _}, {_, 0, _}] =
rabbit_ct_broker_helpers:rpc(
Config, 0, recon, proc_count, [message_queue_len, 2]),

%% The writer process should have only a limited message queue,
%% but it is hard to exactly know how long.
%% (There are some `inet_reply' messages from the
%% inet driver, and some messages from the channel,
%% we estimate the later to be less than double the
%% initial credit)
{messages, Msgs} = rabbit_ct_broker_helpers:rpc(
Config, 0, erlang, process_info, [WriterPid, messages]),
CmdLen = length([Msg || Msg <- Msgs,
element(1, Msg) =:= send_command_flow]),
case {writer_msg_queue_len, CmdLen, MQLen} of
_ when CmdLen < 2 * 20 -> ok
end,

ExpDest = 0,
#'queue.declare_ok'{message_count = ExpDest} =
amqp_channel:call(Ch, #'queue.declare'{queue = <<"dest">>,
durable = true}),
#'queue.declare_ok'{message_count = SrcCnt} =
amqp_channel:call(Ch, #'queue.declare'{queue = <<"src">>}),

%% Most messages should still be in the queue either ready or unacked
case {src_queue_message_count, SrcCnt} of
_ when 0 < SrcCnt andalso SrcCnt < 1000 - MQLen -> ok
end
after
set_vm_memory_high_watermark(Config, OrigLimit),
set_default_credit(Config, OrigCredit)
end
end).

%%----------------------------------------------------------------------------

Expand Down Expand Up @@ -541,3 +603,21 @@ await_autodelete1(_Config, Name) ->
shovels_from_parameters() ->
L = rabbit_runtime_parameters:list(<<"/">>, <<"shovel">>),
[rabbit_misc:pget(name, Shovel) || Shovel <- L].

set_default_credit(Config, Value) ->
{ok, OrigValue} =
rabbit_ct_broker_helpers:rpc(
Config, 0, application, get_env, [rabbit, credit_flow_default_credit]),
ok =
rabbit_ct_broker_helpers:rpc(
Config, 0, application, set_env, [rabbit, credit_flow_default_credit, Value]),
OrigValue.

set_vm_memory_high_watermark(Config, Limit) ->
OrigLimit =
rabbit_ct_broker_helpers:rpc(
Config, 0, vm_memory_monitor, get_vm_memory_high_watermark, []),
ok =
rabbit_ct_broker_helpers:rpc(
Config, 0, vm_memory_monitor, set_vm_memory_high_watermark, [Limit]),
OrigLimit.