Skip to content

Commit e25b738

Browse files
committed
Include DLX consumer in smallest_raft_index/1
The oldest message might be checked out by the DLX consumer.
1 parent 1761af2 commit e25b738

File tree

1 file changed

+24
-19
lines changed

1 file changed

+24
-19
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2879,25 +2879,30 @@ convert(Meta, 7, To, State) ->
28792879
convert(Meta, 8, To, convert_v7_to_v8(Meta, State)).
28802880

28812881
smallest_raft_index(#?STATE{messages = Messages,
2882-
dlx = #?DLX{discards = Discards}} = State) ->
2883-
SmallestDlxRaIdx = lqueue:fold(fun (?TUPLE(_, Msg), Acc) ->
2884-
min(get_msg_idx(Msg), Acc)
2885-
end, undefined, Discards),
2886-
SmallestMsgsRaIdx = rabbit_fifo_pq:get_lowest_index(Messages),
2887-
%% scan consumers and returns queue here instead
2888-
smallest_checked_out(State, min(SmallestDlxRaIdx, SmallestMsgsRaIdx)).
2889-
2890-
smallest_checked_out(#?STATE{returns = Returns,
2891-
consumers = Consumers}, Min) ->
2892-
SmallestSoFar = lqueue:fold(fun (Msg, Acc) ->
2893-
min(get_msg_idx(Msg), Acc)
2894-
end, Min, Returns),
2895-
maps:fold(fun (_Cid, #consumer{checked_out = Ch}, Acc0) ->
2896-
maps:fold(
2897-
fun (_MsgId, Msg, Acc) ->
2898-
min(get_msg_idx(Msg), Acc)
2899-
end, Acc0, Ch)
2900-
end, SmallestSoFar, Consumers).
2882+
returns = Returns,
2883+
consumers = Consumers,
2884+
dlx = #?DLX{consumer = DlxConsumer,
2885+
discards = Discards}}) ->
2886+
Min0 = rabbit_fifo_pq:get_lowest_index(Messages),
2887+
Min1 = lqueue:fold(fun (Msg, Acc) ->
2888+
min(get_msg_idx(Msg), Acc)
2889+
end, Min0, Returns),
2890+
Min2 = maps:fold(fun (_Cid, #consumer{checked_out = Ch}, Acc0) ->
2891+
maps:fold(fun (_MsgId, Msg, Acc) ->
2892+
min(get_msg_idx(Msg), Acc)
2893+
end, Acc0, Ch)
2894+
end, Min1, Consumers),
2895+
Min = lqueue:fold(fun (?TUPLE(_Reason, Msg), Acc) ->
2896+
min(get_msg_idx(Msg), Acc)
2897+
end, Min2, Discards),
2898+
case DlxConsumer of
2899+
undefined ->
2900+
Min;
2901+
#dlx_consumer{checked_out = Checked} ->
2902+
maps:fold(fun(_MsgId, ?TUPLE(_Reason, Msg), Acc) ->
2903+
min(get_msg_idx(Msg), Acc)
2904+
end, Min, Checked)
2905+
end.
29012906

29022907
make_requeue(ConsumerKey, Notify, [{MsgId, Idx, Header, Msg}], Acc) ->
29032908
lists:reverse([{append,

0 commit comments

Comments
 (0)