Skip to content

Commit 07be784

Browse files
author
Loïc Hoguin
committed
CQ: Make dirty recovery of shared store more efficient
This only applies to v2 because modifying this part of the v1 code is seen as too risky considering v1 will soon get removed.
1 parent 338dc71 commit 07be784

File tree

2 files changed

+32
-40
lines changed

2 files changed

+32
-40
lines changed

deps/rabbit/src/rabbit_classic_queue_index_v2.erl

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1127,8 +1127,8 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
11271127
empty ->
11281128
ok = gatherer:stop(Gatherer),
11291129
finished;
1130-
{value, {MsgId, Count}} ->
1131-
{MsgId, Count, {next, Gatherer}}
1130+
{value, MsgIds} ->
1131+
{MsgIds, {next, Gatherer}}
11321132
end.
11331133

11341134
queue_index_walker_reader(#resource{ virtual_host = VHost } = Name, Gatherer) ->
@@ -1155,27 +1155,30 @@ queue_index_walker_segment(F, Gatherer) ->
11551155
{ok, <<?MAGIC:32,?VERSION:8,
11561156
FromSeqId:64/unsigned,ToSeqId:64/unsigned,
11571157
_/bits>>} ->
1158-
queue_index_walker_segment(Fd, Gatherer, 0, ToSeqId - FromSeqId);
1158+
queue_index_walker_segment(Fd, Gatherer, 0, ToSeqId - FromSeqId, []);
11591159
_ ->
11601160
%% Invalid segment file. Skip.
11611161
ok
11621162
end,
11631163
ok = file:close(Fd).
11641164

1165-
queue_index_walker_segment(_, _, N, N) ->
1165+
queue_index_walker_segment(_, Gatherer, N, N, Acc) ->
11661166
%% We reached the end of the segment file.
1167+
gatherer:sync_in(Gatherer, Acc),
11671168
ok;
1168-
queue_index_walker_segment(Fd, Gatherer, N, Total) ->
1169+
queue_index_walker_segment(Fd, Gatherer, N, Total, Acc) ->
11691170
case file:read(Fd, ?ENTRY_SIZE) of
11701171
%% We found a non-ack persistent entry. Gather it.
11711172
{ok, <<1,_:7,1:1,_,1,Id:16/binary,_/bits>>} ->
1172-
gatherer:sync_in(Gatherer, {Id, 1}),
1173-
queue_index_walker_segment(Fd, Gatherer, N + 1, Total);
1173+
queue_index_walker_segment(Fd, Gatherer, N + 1, Total, [Id|Acc]);
11741174
%% We found an ack, a transient entry or a non-entry. Skip it.
11751175
{ok, _} ->
1176-
queue_index_walker_segment(Fd, Gatherer, N + 1, Total);
1176+
queue_index_walker_segment(Fd, Gatherer, N + 1, Total, Acc);
11771177
%% We reached the end of a partial segment file.
1178+
eof when Acc =:= [] ->
1179+
ok;
11781180
eof ->
1181+
gatherer:sync_in(Gatherer, Acc),
11791182
ok
11801183
end.
11811184

deps/rabbit/src/rabbit_msg_store.erl

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,15 @@
2626

2727
-include_lib("rabbit_common/include/rabbit_msg_store.hrl").
2828

29-
%% We flush to disk when the write buffer gets above the max size,
30-
%% or at an interval to make sure we don't keep the data in memory
31-
%% too long. Confirms are sent after the data is flushed to disk.
32-
-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB.
33-
-define(SYNC_INTERVAL, 200). %% Milliseconds.
29+
%% We flush to disk at an interval to make sure we don't keep
30+
%% the data in memory too long. Confirms are sent after the
31+
%% data is flushed to disk.
32+
-define(SYNC_INTERVAL, 200). %% Milliseconds.
3433

3534
-define(CLEAN_FILENAME, "clean.dot").
3635
-define(FILE_SUMMARY_FILENAME, "file_summary.ets").
3736

38-
-define(BINARY_MODE, [raw, binary]).
39-
-define(READ_MODE, [read]).
40-
-define(WRITE_MODE, [write]).
41-
4237
-define(FILE_EXTENSION, ".rdq").
43-
-define(FILE_EXTENSION_TMP, ".rdt").
4438

4539
%% We keep track of flying messages for writes and removes. The idea is that
4640
%% when a remove comes in before we could process the write, we skip the
@@ -1575,24 +1569,19 @@ count_msg_refs(Gen, Seed, State) ->
15751569
case Gen(Seed) of
15761570
finished ->
15771571
ok;
1578-
{_MsgId, 0, Next} ->
1572+
%% This clause is kept for v1 compatibility purposes.
1573+
%% It can be removed once we no longer support converting from v1 data.
1574+
{{MsgId, 1}, Next} ->
1575+
%% @todo Remove index_module and avoid this element/2.
1576+
_ = ets:update_counter(element(2, State#msstate.index_state), MsgId, +1,
1577+
#msg_location{msg_id=MsgId, file=undefined, ref_count=1}),
15791578
count_msg_refs(Gen, Next, State);
1580-
{MsgId, Delta, Next} ->
1581-
ok = case index_lookup(MsgId, State) of
1582-
not_found ->
1583-
index_insert(#msg_location { msg_id = MsgId,
1584-
file = undefined,
1585-
ref_count = Delta },
1586-
State);
1587-
#msg_location { ref_count = RefCount } = StoreEntry ->
1588-
NewRefCount = RefCount + Delta,
1589-
case NewRefCount of
1590-
0 -> index_delete(MsgId, State);
1591-
_ -> index_update(StoreEntry #msg_location {
1592-
ref_count = NewRefCount },
1593-
State)
1594-
end
1595-
end,
1579+
{MsgIds, Next} ->
1580+
lists:foreach(fun(MsgId) ->
1581+
%% @todo Remove index_module and avoid this element/2.
1582+
ets:update_counter(element(2, State#msstate.index_state), MsgId, +1,
1583+
#msg_location{msg_id=MsgId, file=undefined, ref_count=1})
1584+
end, MsgIds),
15961585
count_msg_refs(Gen, Next, State)
15971586
end.
15981587

@@ -1621,15 +1610,15 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
16211610
FileName = filenum_to_name(File),
16221611
rabbit_log:debug("Rebuilding message location index from ~ts (~B file(s) remaining)",
16231612
[form_filename(Dir, FileName), length(Files)]),
1624-
{ok, Messages0, FileSize} =
1613+
%% The scan function already dealt with duplicate messages
1614+
%% within the file. We then get messages in reverse order.
1615+
{ok, Messages, FileSize} =
16251616
scan_file_for_valid_messages(Dir, FileName),
1626-
%% The scan gives us messages end-of-file first so we reverse the list
1627-
%% in case a compaction had occurred before shutdown to not have to repeat it.
1628-
Messages = lists:reverse(Messages0),
16291617
{ValidMessages, ValidTotalSize} =
16301618
lists:foldl(
16311619
fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
1632-
%% We only keep the first message in the file. Duplicates (due to compaction) get ignored.
1620+
%% Fan-out may result in the same message data in multiple
1621+
%% files so we have to guard against it.
16331622
case index_lookup(MsgId, State) of
16341623
#msg_location { file = undefined } = StoreEntry ->
16351624
ok = index_update(StoreEntry #msg_location {

0 commit comments

Comments
 (0)