Skip to content

Commit 6d7c296

Browse files
committed
Optimise shovel pending queue handling for flow-control
The pending queue can grow up to prefetch count (which is 1000 by default and can be more) while it is forwarded in chunks every time a bump_credit grants more credit to the shovel process (which is 200 by default). This change avoids trying to forward all pending entries just to put back most of them in the pending queue.
1 parent c0d46e4 commit 6d7c296

File tree

1 file changed

+33
-10
lines changed

1 file changed

+33
-10
lines changed

deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,24 @@ dest_endpoint(#{dest := Dest}) ->
155155
Keys = [dest_exchange, dest_exchange_key, dest_queue],
156156
maps:to_list(maps:filter(fun(K, _) -> proplists:is_defined(K, Keys) end, Dest)).
157157

158+
forward_pending(State) ->
159+
case pop_pending(State) of
160+
empty ->
161+
State;
162+
{{Tag, Props, Payload}, S} ->
163+
S2 = do_forward(Tag, Props, Payload, S),
164+
S3 = control_throttle(S2),
165+
case is_blocked(S3) of
166+
true ->
167+
%% We are blocked by client-side flow-control and/or
168+
%% `connection.blocked` message from the destination
169+
%% broker. Stop forwarding pending messages.
170+
S3;
171+
false ->
172+
forward_pending(S3)
173+
end
174+
end.
175+
158176
forward(IncomingTag, Props, Payload, State) ->
159177
case is_blocked(State) of
160178
true ->
@@ -277,19 +295,15 @@ handle_dest(#'connection.blocked'{}, State) ->
277295
update_blocked_by(connection_blocked, true, State);
278296

279297
handle_dest(#'connection.unblocked'{}, State) ->
280-
{Pending, State1} = reset_pending(update_blocked_by(connection_blocked, false, State)),
298+
State1 = update_blocked_by(connection_blocked, false, State),
281299
%% we are unblocked so can begin to forward
282-
lists:foldl(fun ({Tag, Props, Payload}, S) ->
283-
forward(Tag, Props, Payload, S)
284-
end, State1, lists:reverse(Pending));
300+
forward_pending(State1);
285301

286302
handle_dest({bump_credit, Msg}, State) ->
287303
credit_flow:handle_bump_msg(Msg),
288-
{Pending, State1} = reset_pending(control_throttle(State)),
304+
State1 = control_throttle(State),
289305
%% we have credit so can begin to forward
290-
lists:foldl(fun ({Tag, Props, Payload}, S) ->
291-
forward(Tag, Props, Payload, S)
292-
end, State1, lists:reverse(Pending));
306+
forward_pending(State1);
293307

294308
handle_dest(_Msg, _State) ->
295309
not_handled.
@@ -377,8 +391,17 @@ status(_) ->
377391
credit_flow:state().
378392

379393
add_pending(Elem, State = #{dest := Dest}) ->
380-
Pending = maps:get(pending, Dest, []),
381-
State#{dest => Dest#{pending => [Elem|Pending]}}.
394+
Pending = maps:get(pending, Dest, queue:new()),
395+
State#{dest => Dest#{pending => queue:in(Elem, Pending)}}.
396+
397+
pop_pending(State = #{dest := Dest}) ->
398+
Pending = maps:get(pending, Dest, queue:new()),
399+
case queue:out(Pending) of
400+
{empty, _} ->
401+
empty;
402+
{{value, Elem}, Pending2} ->
403+
{Elem, State#{dest => Dest#{pending => Pending2}}}
404+
end.
382405

383406
reset_pending(State = #{dest := Dest}) ->
384407
Pending = maps:get(pending, Dest, []),

0 commit comments

Comments
 (0)