Skip to content

Commit b1509ad

Browse files
Merge pull request #5778 from rabbitmq/mergify/bp/v3.11.x/pr-5715
Use credit flow for dest side of on-publish shovels (backport #5715)
2 parents 665b409 + ddf354c commit b1509ad

File tree

2 files changed

+138
-6
lines changed

2 files changed

+138
-6
lines changed

deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,23 @@ dest_endpoint(#{dest := Dest}) ->
153153
Keys = [dest_exchange, dest_exchange_key, dest_queue],
154154
maps:to_list(maps:filter(fun(K, _) -> proplists:is_defined(K, Keys) end, Dest)).
155155

156-
forward(IncomingTag, Props, Payload,
157-
State0 = #{dest := #{props_fun := PropsFun,
158-
current := {_, _, DstUri},
159-
fields_fun := FieldsFun}}) ->
156+
forward(IncomingTag, Props, Payload, State) ->
157+
State1 = control_throttle(State),
158+
case is_blocked(State1) of
159+
true ->
160+
%% We are blocked by client-side flow-control and/or
161+
%% `connection.blocked` message from the destination
162+
%% broker. Simply cache the forward.
163+
PendingEntry = {IncomingTag, Props, Payload},
164+
add_pending(PendingEntry, State1);
165+
false ->
166+
do_forward(IncomingTag, Props, Payload, State1)
167+
end.
168+
169+
do_forward(IncomingTag, Props, Payload,
170+
State0 = #{dest := #{props_fun := PropsFun,
171+
current := {_, _, DstUri},
172+
fields_fun := FieldsFun}}) ->
160173
SrcUri = rabbit_shovel_behaviour:source_uri(State0),
161174
% do publish
162175
Exchange = maps:get(exchange, Props, undefined),
@@ -258,6 +271,14 @@ handle_dest({'EXIT', Conn, Reason}, #{dest := #{current := {Conn, _, _}}}) ->
258271
handle_dest({'EXIT', _Pid, {shutdown, {server_initiated_close, ?PRECONDITION_FAILED, Reason}}}, _State) ->
259272
{stop, {outbound_link_or_channel_closure, Reason}};
260273

274+
handle_dest({bump_credit, Msg}, State) ->
275+
credit_flow:handle_bump_msg(Msg),
276+
{Pending, State1} = reset_pending(control_throttle(State)),
277+
%% we have credit so can begin to forward
278+
lists:foldl(fun ({Tag, Props, Payload}, S) ->
279+
forward(Tag, Props, Payload, S)
280+
end, State1, lists:reverse(Pending));
281+
261282
handle_dest(_Msg, _State) ->
262283
not_handled.
263284

@@ -301,7 +322,13 @@ publish(IncomingTag, Method, Msg,
301322
amqp_channel:next_publish_seqno(OutboundChan);
302323
_ -> undefined
303324
end,
304-
ok = amqp_channel:call(OutboundChan, Method, Msg),
325+
case AckMode of
326+
on_publish ->
327+
ok = amqp_channel:cast_flow(OutboundChan, Method, Msg);
328+
_ ->
329+
ok = amqp_channel:call(OutboundChan, Method, Msg)
330+
end,
331+
305332
rabbit_shovel_behaviour:decr_remaining_unacked(
306333
case AckMode of
307334
no_ack ->
@@ -313,6 +340,31 @@ publish(IncomingTag, Method, Msg,
313340
rabbit_shovel_behaviour:decr_remaining(1, State1)
314341
end).
315342

343+
control_throttle(State) ->
344+
update_blocked_by(flow, credit_flow:blocked(), State).
345+
346+
update_blocked_by(Tag, IsBlocked, State = #{dest := Dest}) ->
347+
BlockReasons = maps:get(blocked_by, Dest, []),
348+
NewBlockReasons =
349+
case IsBlocked of
350+
true -> ordsets:add_element(Tag, BlockReasons);
351+
false -> ordsets:del_element(Tag, BlockReasons)
352+
end,
353+
State#{dest => Dest#{blocked_by => NewBlockReasons}}.
354+
355+
is_blocked(#{dest := #{blocked_by := BlockReasons}}) when BlockReasons =/= [] ->
356+
true;
357+
is_blocked(_) ->
358+
false.
359+
360+
add_pending(Elem, State = #{dest := Dest}) ->
361+
Pending = maps:get(pending, Dest, []),
362+
State#{dest => Dest#{pending => [Elem|Pending]}}.
363+
364+
reset_pending(State = #{dest := Dest}) ->
365+
Pending = maps:get(pending, Dest, []),
366+
{Pending, State#{dest => Dest#{pending => []}}}.
367+
316368
make_conn_and_chan([], {VHost, Name} = _ShovelName) ->
317369
rabbit_log:error(
318370
"Shovel '~s' in vhost '~s' has no more URIs to try for connection",

deps/rabbitmq_shovel/test/dynamic_SUITE.erl

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ groups() ->
3434
autodelete,
3535
validation,
3636
security_validation,
37-
get_connection_name
37+
get_connection_name,
38+
credit_flow
3839
]},
3940

4041
{quorum_queue_tests, [], [
@@ -439,6 +440,67 @@ get_connection_name(_Config) ->
439440
<<"Shovel">> = rabbit_shovel_worker:get_connection_name({one, two, three}),
440441
<<"Shovel">> = rabbit_shovel_worker:get_connection_name(<<"anything else">>).
441442

443+
credit_flow(Config) ->
444+
OrigCredit = set_default_credit(Config, {20, 10}),
445+
446+
with_ch(Config,
447+
fun (Ch) ->
448+
amqp_channel:call(Ch, #'confirm.select'{}),
449+
amqp_channel:call(Ch, #'queue.declare'{queue = <<"src">>}),
450+
%% Send larger payloads to fill up the socket buffers quicker
451+
Payload = binary:copy(<<"hello">>, 1000),
452+
publish_count(Ch, <<>>, <<"src">>, Payload, 1000),
453+
amqp_channel:wait_for_confirms(Ch),
454+
455+
OrigLimit = set_vm_memory_high_watermark(Config, 0.00000001),
456+
%% Let connection block.
457+
timer:sleep(100),
458+
459+
try
460+
shovel_test_utils:set_param_nowait(
461+
Config,
462+
<<"test">>, [{<<"src-queue">>, <<"src">>},
463+
{<<"dest-queue">>, <<"dest">>},
464+
{<<"src-prefetch-count">>, 50},
465+
{<<"ack-mode">>, <<"on-publish">>},
466+
{<<"src-delete-after">>, <<"never">>}]),
467+
shovel_test_utils:await_shovel(Config, <<"test">>),
468+
469+
%% There should be only one process with a message buildup
470+
[{WriterPid, MQLen, _}, {_, 0, _}] =
471+
rabbit_ct_broker_helpers:rpc(
472+
Config, 0, recon, proc_count, [message_queue_len, 2]),
473+
474+
%% The writer process should have only a limited message queue,
475+
%% but it is hard to exactly know how long.
476+
%% (There are some `inet_reply' messages from the
477+
%% inet driver, and some messages from the channel,
478+
%% we estimate the later to be less than double the
479+
%% initial credit)
480+
{messages, Msgs} = rabbit_ct_broker_helpers:rpc(
481+
Config, 0, erlang, process_info, [WriterPid, messages]),
482+
CmdLen = length([Msg || Msg <- Msgs,
483+
element(1, Msg) =:= send_command_flow]),
484+
case {writer_msg_queue_len, CmdLen, MQLen} of
485+
_ when CmdLen < 2 * 20 -> ok
486+
end,
487+
488+
ExpDest = 0,
489+
#'queue.declare_ok'{message_count = ExpDest} =
490+
amqp_channel:call(Ch, #'queue.declare'{queue = <<"dest">>,
491+
durable = true}),
492+
#'queue.declare_ok'{message_count = SrcCnt} =
493+
amqp_channel:call(Ch, #'queue.declare'{queue = <<"src">>}),
494+
495+
%% Most messages should still be in the queue either ready or unacked
496+
case {src_queue_message_count, SrcCnt} of
497+
_ when 0 < SrcCnt andalso SrcCnt < 1000 - MQLen -> ok
498+
end
499+
after
500+
set_vm_memory_high_watermark(Config, OrigLimit),
501+
set_default_credit(Config, OrigCredit)
502+
end
503+
end).
442504

443505
%%----------------------------------------------------------------------------
444506

@@ -541,3 +603,21 @@ await_autodelete1(_Config, Name) ->
541603
shovels_from_parameters() ->
542604
L = rabbit_runtime_parameters:list(<<"/">>, <<"shovel">>),
543605
[rabbit_misc:pget(name, Shovel) || Shovel <- L].
606+
607+
set_default_credit(Config, Value) ->
608+
{ok, OrigValue} =
609+
rabbit_ct_broker_helpers:rpc(
610+
Config, 0, application, get_env, [rabbit, credit_flow_default_credit]),
611+
ok =
612+
rabbit_ct_broker_helpers:rpc(
613+
Config, 0, application, set_env, [rabbit, credit_flow_default_credit, Value]),
614+
OrigValue.
615+
616+
set_vm_memory_high_watermark(Config, Limit) ->
617+
OrigLimit =
618+
rabbit_ct_broker_helpers:rpc(
619+
Config, 0, vm_memory_monitor, get_vm_memory_high_watermark, []),
620+
ok =
621+
rabbit_ct_broker_helpers:rpc(
622+
Config, 0, vm_memory_monitor, set_vm_memory_high_watermark, [Limit]),
623+
OrigLimit.

0 commit comments

Comments
 (0)