Skip to content

Commit 0d66d69

Browse files
committed
first cut phase 1 compaction impl
1 parent d704338 commit 0d66d69

File tree

3 files changed

+102
-37
lines changed

3 files changed

+102
-37
lines changed

src/ra_log_wal.erl

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -381,26 +381,29 @@ recover_wal(Dir, #conf{system = System,
381381
end || File <- Files0,
382382
filename:extension(File) == ".wal"],
383383
WalFiles = lists:sort(Files),
384-
AllWriters =
385-
[begin
386-
?DEBUG("WAL in ~ts: recovering ~ts, Mode ~s",
387-
[System, F, Mode]),
388-
Fd = open_at_first_record(filename:join(Dir, F)),
389-
{Time, #recovery{ranges = Ranges,
390-
writers = Writers}} =
391-
timer:tc(fun () -> recover_wal_chunks(Conf, Fd, Mode) end),
392-
393-
ok = ra_log_segment_writer:accept_mem_tables(SegWriter, Ranges, F),
394-
395-
close_existing(Fd),
396-
?DEBUG("WAL in ~ts: recovered ~ts time taken ~bms - recovered ~b writers",
397-
[System, F, Time div 1000, map_size(Writers)]),
398-
Writers
399-
end || F <- WalFiles],
400-
401-
FinalWriters = lists:foldl(fun (New, Acc) ->
402-
maps:merge(Acc, New)
403-
end, #{}, AllWriters),
384+
FinalWriters =
385+
lists:foldl(fun (F, Writers0) ->
386+
?DEBUG("WAL in ~ts: recovering ~ts, Mode ~s",
387+
[System, F, Mode]),
388+
Fd = open_at_first_record(filename:join(Dir, F)),
389+
{Time, #recovery{ranges = Ranges,
390+
writers = Writers}} =
391+
timer:tc(fun () ->
392+
recover_wal_chunks(Conf, Fd,
393+
Writers0, Mode)
394+
end),
395+
396+
ok = ra_log_segment_writer:accept_mem_tables(SegWriter,
397+
Ranges, F),
398+
close_existing(Fd),
399+
?DEBUG("WAL in ~ts: recovered ~ts time taken ~bms - recovered ~b writers",
400+
[System, F, Time div 1000, map_size(Writers)]),
401+
Writers
402+
end, #{}, WalFiles),
403+
404+
% FinalWriters = lists:foldl(fun (New, Acc) ->
405+
% maps:merge(Acc, New)
406+
% end, #{}, AllWriters),
404407

405408
?DEBUG("WAL in ~ts: final writers recovered ~b",
406409
[System, map_size(FinalWriters)]),
@@ -781,9 +784,10 @@ dump_records(<<_:1/unsigned, 1:1/unsigned, _:22/unsigned,
781784
dump_records(<<>>, Entries) ->
782785
Entries.
783786

784-
recover_wal_chunks(#conf{} = Conf, Fd, Mode) ->
787+
recover_wal_chunks(#conf{} = Conf, Fd, Writers, Mode) ->
785788
Chunk = read_wal_chunk(Fd, Conf#conf.recovery_chunk_size),
786-
recover_records(Conf, Fd, Chunk, #{}, #recovery{mode = Mode}).
789+
recover_records(Conf, Fd, Chunk, #{}, #recovery{mode = Mode,
790+
writers = Writers}).
787791
% All zeros indicates end of a pre-allocated wal file
788792
recover_records(_, _Fd, <<0:1/unsigned, 0:1/unsigned, 0:22/unsigned,
789793
IdDataLen:16/unsigned, _:IdDataLen/binary,
@@ -824,10 +828,11 @@ recover_records(#conf{names = Names} = Conf, Fd,
824828
% W ->
825829
% W#{UId => {in_seq, SmallestIdx}}
826830
% end,
827-
W = State0#recovery.writers,
828-
Writers = W#{UId => {in_seq, SmallestIdx - 1}},
831+
Writers = State0#recovery.writers,
832+
% Writers = W#{UId => {in_seq, SmallestIdx - 1}},
829833
recover_records(Conf, Fd, Rest, Cache,
830-
State0#recovery{writers = Writers});
834+
State0#recovery{writers =
835+
maps:remove(UId, Writers)});
831836
error ->
832837
System = Conf#conf.system,
833838
?DEBUG("WAL in ~ts: record failed CRC check. If this is the last record"
@@ -1004,7 +1009,17 @@ recover_entry(Names, UId, {Idx, _, _} = Entry, SmallestIdx,
10041009
{ok, M} = ra_log_ets:mem_table_please(Names, UId),
10051010
M
10061011
end,
1007-
case ra_mt:insert(Entry, Mt0) of
1012+
%% always use write_sparse as there is nothing to indicate in the wal
1013+
%% data if an entry was written as such. this way we recover all writes
1014+
%% so should be ok for all types of writes
1015+
PrevIdx = case Writers of
1016+
#{UId := {in_seq, I}} ->
1017+
I;
1018+
_ ->
1019+
undefined
1020+
end,
1021+
% ct:pal("ra_mt:insert_sparse ~b ~w", [Idx, PrevIdx]),
1022+
case ra_mt:insert_sparse(Entry, PrevIdx, Mt0) of
10081023
{ok, Mt1} ->
10091024
Ranges = update_ranges(Ranges0, UId, ra_mt:tid(Mt1),
10101025
SmallestIdx, [Idx]),
@@ -1014,7 +1029,8 @@ recover_entry(Names, UId, {Idx, _, _} = Entry, SmallestIdx,
10141029
{error, overwriting} ->
10151030
%% create successor memtable
10161031
{ok, Mt1} = ra_log_ets:new_mem_table_please(Names, UId, Mt0),
1017-
{retry, State#recovery{tables = Tables#{UId => Mt1}}}
1032+
{retry, State#recovery{tables = Tables#{UId => Mt1},
1033+
writers = maps:remove(UId, Writers)}}
10181034
end;
10191035
recover_entry(Names, UId, {Idx, Term, _}, SmallestIdx,
10201036
#recovery{mode = post_boot,
@@ -1049,6 +1065,7 @@ handle_trunc(false, _UId, _Idx, State) ->
10491065
State;
10501066
handle_trunc(true, UId, Idx, #recovery{mode = Mode,
10511067
ranges = Ranges0,
1068+
writers = Writers,
10521069
tables = Tbls} = State) ->
10531070
case Tbls of
10541071
#{UId := Mt0} when Mode == initial ->
@@ -1065,9 +1082,10 @@ handle_trunc(true, UId, Idx, #recovery{mode = Mode,
10651082
end,
10661083

10671084
State#recovery{tables = Tbls#{UId => Mt},
1085+
writers = maps:remove(UId, Writers),
10681086
ranges = Ranges};
10691087
_ ->
1070-
State
1088+
State#recovery{writers = maps:remove(UId, Writers)}
10711089
end.
10721090

10731091
named_cast(To, Msg) when is_pid(To) ->

src/ra_mt.erl

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,11 @@ insert({Idx, _, _} = _Entry,
122122
-spec insert_sparse(log_entry(), undefined | ra:index(), state()) ->
123123
{ok, state()} | {error, gap_detected | limit_reached}.
124124
insert_sparse({Idx, _, _} = Entry, LastIdx,
125-
#?MODULE{tid = Tid,
126-
indexes = Seq} = State) ->
127-
case ra_seq:last(Seq) == LastIdx of
125+
#?MODULE{tid = Tid,
126+
indexes = Seq} = State) ->
127+
LastSeq = ra_seq:last(Seq),
128+
IsOverwriting = Idx =< LastSeq andalso is_integer(LastSeq),
129+
case LastSeq == LastIdx andalso not IsOverwriting of
128130
true ->
129131
case ra_seq:length(Seq) > ?MAX_MEMTBL_ENTRIES of
130132
true ->
@@ -134,7 +136,12 @@ insert_sparse({Idx, _, _} = Entry, LastIdx,
134136
{ok, State#?MODULE{indexes = ra_seq:append(Idx, Seq)}}
135137
end;
136138
false ->
137-
{error, gap_detected}
139+
case IsOverwriting of
140+
true ->
141+
{error, overwriting};
142+
false ->
143+
{error, gap_detected, Idx, LastSeq}
144+
end
138145
end.
139146

140147
-spec stage(log_entry(), state()) ->

test/ra_log_wal_SUITE.erl

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ all_tests() ->
2828
sparse_write_same_batch,
2929
sparse_write_overwrite,
3030
sparse_write_recover,
31+
sparse_write_recover_with_mt,
3132
wal_filename_upgrade,
3233
same_uid_different_process,
3334
consecutive_terms_in_batch_should_result_in_two_written_events,
@@ -207,19 +208,57 @@ sparse_write_same_batch(Config) ->
207208
ok.
208209

209210
sparse_write_recover(Config) ->
211+
%% no mt case
210212
meck:new(ra_log_segment_writer, [passthrough]),
211213
meck:expect(ra_log_segment_writer, await, fun(_) -> ok end),
212214
Conf = ?config(wal_conf, Config),
213215
{UId, _} = WriterId = ?config(writer_id, Config),
214-
Tid = ets:new(?FUNCTION_NAME, []),
216+
Names = ?config(names, Config),
217+
%% create a tid that isn't registered as mt
218+
Tid = ets:new(?MODULE, [set]),
215219
{ok, Pid} = ra_log_wal:start_link(Conf),
216220
{ok, _} = ra_log_wal:write(Pid, WriterId, Tid, 11, 12, 1, "value"),
217221
ok = await_written(WriterId, 1, [12]),
218222
{ok, _} = ra_log_wal:write(Pid, WriterId, Tid, 12, 15, 1, "value2"),
219223
ok = await_written(WriterId, 1, [15]),
224+
?assert(is_process_alive(Pid)),
225+
ok = proc_lib:stop(Pid),
226+
{ok, Pid2} = ra_log_wal:start_link(Conf),
227+
?assert(is_process_alive(Pid2)),
228+
receive
229+
{'$gen_cast',
230+
{mem_tables, #{UId := [{MtTid, [15, 12]}]}, _}} ->
231+
{ok, Mt0} = ra_log_ets:mem_table_please(Names, UId),
232+
?assertEqual(MtTid, ra_mt:tid(Mt0)),
233+
ok
234+
after 5000 ->
235+
flush(),
236+
ct:fail("receiving mem table ranges timed out")
237+
end,
238+
flush(),
239+
proc_lib:stop(Pid2),
240+
meck:unload(),
241+
ok.
220242

221-
ok = proc_lib:stop(ra_log_wal),
222-
{ok, _Pid2} = ra_log_wal:start_link(Conf),
243+
sparse_write_recover_with_mt(Config) ->
244+
meck:new(ra_log_segment_writer, [passthrough]),
245+
meck:expect(ra_log_segment_writer, await, fun(_) -> ok end),
246+
Conf = ?config(wal_conf, Config),
247+
{UId, _} = WriterId = ?config(writer_id, Config),
248+
Names = ?config(names, Config),
249+
{ok, Mt0} = ra_log_ets:mem_table_please(Names, UId),
250+
Tid = ra_mt:tid(Mt0),
251+
{ok, Pid} = ra_log_wal:start_link(Conf),
252+
{ok, Mt1} = ra_mt:insert_sparse({12, 1, "value"}, undefined, Mt0),
253+
{ok, _} = ra_log_wal:write(Pid, WriterId, Tid, 11, 12, 1, "value"),
254+
ok = await_written(WriterId, 1, [12]),
255+
{ok, _Mt} = ra_mt:insert_sparse({15, 1, "value"}, 12, Mt1),
256+
{ok, _} = ra_log_wal:write(Pid, WriterId, Tid, 12, 15, 1, "value2"),
257+
ok = await_written(WriterId, 1, [15]),
258+
?assert(is_process_alive(Pid)),
259+
ok = proc_lib:stop(Pid),
260+
{ok, Pid2} = ra_log_wal:start_link(Conf),
261+
?assert(is_process_alive(Pid2)),
223262
receive
224263
{'$gen_cast',
225264
{mem_tables, #{UId := [{Tid, [15, 12]}]}, _}} ->
@@ -228,7 +267,8 @@ sparse_write_recover(Config) ->
228267
flush(),
229268
ct:fail("receiving mem table ranges timed out")
230269
end,
231-
proc_lib:stop(Pid),
270+
flush(),
271+
proc_lib:stop(Pid2),
232272
meck:unload(),
233273
ok.
234274

@@ -976,7 +1016,7 @@ recover_overwrite(Config) ->
9761016
_ = await_written(WriterId, 2, [{5, 20}]),
9771017

9781018
flush(),
979-
ok = proc_lib:stop(ra_log_wal),
1019+
ok = proc_lib:stop(ra_log_wal, normal, 5000),
9801020
{ok, Pid2} = ra_log_wal:start_link(Conf),
9811021
{ok, Mt} = ra_log_ets:mem_table_please(?config(names, Config), UId),
9821022

0 commit comments

Comments
 (0)