Skip to content

Commit 246bd62

Browse files
committed
Optimise shovel pending queue handling for flow-control
The pending queue can grow up to prefetch count (which is 1000 by default and can be more) while it is forwarded in chunks every time a bump_credit grants more credit to the shovel process (which is 200 by default). This change avoids trying to forward all pending entries just to put back most of them in the pending queue.
1 parent 4d24dda commit 246bd62

File tree

1 file changed

+33
-14
lines changed

1 file changed

+33
-14
lines changed

deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,24 @@ dest_endpoint(#{dest := Dest}) ->
154154
Keys = [dest_exchange, dest_exchange_key, dest_queue],
155155
maps:to_list(maps:filter(fun(K, _) -> proplists:is_defined(K, Keys) end, Dest)).
156156

157+
forward_pending(State) ->
158+
case pop_pending(State) of
159+
empty ->
160+
State;
161+
{{Tag, Props, Payload}, S} ->
162+
S2 = do_forward(Tag, Props, Payload, S),
163+
S3 = control_throttle(S2),
164+
case is_blocked(S3) of
165+
true ->
166+
%% We are blocked by client-side flow-control and/or
167+
%% `connection.blocked` message from the destination
168+
%% broker. Stop forwarding pending messages.
169+
S3;
170+
false ->
171+
forward_pending(S3)
172+
end
173+
end.
174+
157175
forward(IncomingTag, Props, Payload, State) ->
158176
State1 = control_throttle(State),
159177
case is_blocked(State1) of
@@ -276,19 +294,15 @@ handle_dest(#'connection.blocked'{}, State) ->
276294
update_blocked_by(connection_blocked, true, State);
277295

278296
handle_dest(#'connection.unblocked'{}, State) ->
279-
{Pending, State1} = reset_pending(update_blocked_by(connection_blocked, false, State)),
297+
State1 = update_blocked_by(connection_blocked, false, State),
280298
%% we are unblocked so can begin to forward
281-
lists:foldl(fun ({Tag, Props, Payload}, S) ->
282-
forward(Tag, Props, Payload, S)
283-
end, State1, lists:reverse(Pending));
299+
forward_pending(State1);
284300

285301
handle_dest({bump_credit, Msg}, State) ->
286302
credit_flow:handle_bump_msg(Msg),
287-
{Pending, State1} = reset_pending(control_throttle(State)),
303+
State1 = control_throttle(State),
288304
%% we have credit so can begin to forward
289-
lists:foldl(fun ({Tag, Props, Payload}, S) ->
290-
forward(Tag, Props, Payload, S)
291-
end, State1, lists:reverse(Pending));
305+
forward_pending(State1);
292306

293307
handle_dest(_Msg, _State) ->
294308
not_handled.
@@ -369,12 +383,17 @@ is_blocked(_) ->
369383
false.
370384

371385
add_pending(Elem, State = #{dest := Dest}) ->
372-
Pending = maps:get(pending, Dest, []),
373-
State#{dest => Dest#{pending => [Elem|Pending]}}.
374-
375-
reset_pending(State = #{dest := Dest}) ->
376-
Pending = maps:get(pending, Dest, []),
377-
{Pending, State#{dest => Dest#{pending => []}}}.
386+
Pending = maps:get(pending, Dest, queue:new()),
387+
State#{dest => Dest#{pending => queue:in(Elem, Pending)}}.
388+
389+
pop_pending(State = #{dest := Dest}) ->
390+
Pending = maps:get(pending, Dest, queue:new()),
391+
case queue:out(Pending) of
392+
{empty, _} ->
393+
empty;
394+
{{value, Elem}, Pending2} ->
395+
{Elem, State#{dest => Dest#{pending => Pending2}}}
396+
end.
378397

379398
make_conn_and_chan([], {VHost, Name} = _ShovelName) ->
380399
rabbit_log:error(

0 commit comments

Comments
 (0)