Skip to content

Commit ab1281f

Browse files
committed
major compactions
1 parent e1b6bd0 commit ab1281f

File tree

6 files changed

+898
-139
lines changed

6 files changed

+898
-139
lines changed

src/ra_log.erl

Lines changed: 13 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -227,12 +227,11 @@ init(#{uid := UId,
227227
MtRange = ra_mt:range(Mt0),
228228
SegRefs = my_segrefs(UId, SegWriter),
229229
Reader = ra_log_segments:init(UId, Dir, MaxOpen, AccessPattern, SegRefs,
230-
Names, Counter),
230+
Counter, LogId),
231231
SegmentRange = ra_log_segments:range(Reader),
232232
%% TODO: check ra_range:add/2 actually performas the correct logic we expect
233233
Range = ra_range:add(MtRange, SegmentRange),
234234

235-
%% TODO: review thi
236235
[begin
237236
?DEBUG("~ts: deleting overwritten segment ~w",
238237
[LogId, SR]),
@@ -1288,21 +1287,20 @@ delete_everything(#?MODULE{cfg = #cfg{uid = UId,
12881287

12891288
-spec release_resources(non_neg_integer(),
12901289
sequential | random, state()) -> state().
1291-
release_resources(MaxOpenSegments,
1292-
AccessPattern,
1290+
release_resources(MaxOpenSegments, AccessPattern,
12931291
#?MODULE{cfg = #cfg{uid = UId,
1292+
log_id = LogId,
12941293
directory = Dir,
1295-
counter = Counter,
1296-
names = Names},
1294+
counter = Counter},
12971295
reader = Reader} = State) ->
12981296
ActiveSegs = ra_log_segments:segment_refs(Reader),
12991297
% close all open segments
13001298
% deliberately ignoring return value
13011299
_ = ra_log_segments:close(Reader),
13021300
%% open a new segment with the new max open segment value
13031301
State#?MODULE{reader = ra_log_segments:init(UId, Dir, MaxOpenSegments,
1304-
AccessPattern,
1305-
ActiveSegs, Names, Counter)}.
1302+
AccessPattern, ActiveSegs,
1303+
Counter, LogId)}.
13061304

13071305

13081306
%%% Local functions
@@ -1482,45 +1480,16 @@ my_segrefs(UId, SegWriter) ->
14821480
%% if a server recovered when a segment had been opened
14831481
%% but never had any entries written the segref would be
14841482
%% undefined
1485-
case ra_log_segment:segref(File) of
1486-
undefined ->
1487-
Acc;
1488-
SegRef ->
1489-
[SegRef | Acc]
1483+
case ra_log_segment:info(File) of
1484+
#{ref := SegRef,
1485+
file_type := regular}
1486+
when is_tuple(SegRef) ->
1487+
[SegRef | Acc];
1488+
_ ->
1489+
Acc
14901490
end
14911491
end, [], SegFiles).
14921492

1493-
% recover_ranges(UId, MtRange, SegWriter) ->
1494-
% % 1. check mem_tables (this assumes wal has finished recovering
1495-
% % which means it is essential that ra_servers are part of the same
1496-
% % supervision tree
1497-
% % 2. check segments
1498-
% SegFiles = ra_log_segment_writer:my_segments(SegWriter, UId),
1499-
% SegRefs = lists:foldl(
1500-
% fun (File, Acc) ->
1501-
% %% if a server recovered when a segment had been opened
1502-
% %% but never had any entries written the segref would be
1503-
% %% undefined
1504-
% case ra_log_segment:segref(File) of
1505-
% undefined ->
1506-
% Acc;
1507-
% SegRef ->
1508-
% [SegRef | Acc]
1509-
% end
1510-
% end, [], SegFiles),
1511-
% SegRanges = [Range || {Range, _} <- SegRefs],
1512-
% Ranges = [MtRange | SegRanges],
1513-
% {pick_range(Ranges, undefined), SegRefs}.
1514-
1515-
% picks the current range from a sorted (newest to oldest) list of ranges
1516-
% pick_range([], Res) ->
1517-
% Res;
1518-
% pick_range([H | Tail], undefined) ->
1519-
% pick_range(Tail, H);
1520-
% pick_range([{Fst, _Lst} | Tail], {CurFst, CurLst}) ->
1521-
% pick_range(Tail, {min(Fst, CurFst), CurLst}).
1522-
1523-
15241493
%% TODO: implement synchronous writes using gen_batch_server:call/3
15251494
await_written_idx(Idx, Term, Log0) ->
15261495
receive
@@ -1536,17 +1505,6 @@ await_written_idx(Idx, Term, Log0) ->
15361505
throw(ra_log_append_timeout)
15371506
end.
15381507

1539-
% log_update_wait_n(0) ->
1540-
% ok;
1541-
% log_update_wait_n(N) ->
1542-
% receive
1543-
% ra_log_update_processed ->
1544-
% log_update_wait_n(N - 1)
1545-
% after 1500 ->
1546-
% %% just go ahead anyway
1547-
% ok
1548-
% end.
1549-
15501508
incr_counter(#cfg{counter = Cnt}, Ix, N) when Cnt =/= undefined ->
15511509
counters:add(Cnt, Ix, N);
15521510
incr_counter(#cfg{counter = undefined}, _Ix, _N) ->

src/ra_log_segment.erl

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
max_count/1,
2323
filename/1,
2424
segref/1,
25+
info/1,
2526
is_same_as/2,
2627
copy/3]).
2728

@@ -56,7 +57,7 @@
5657
fd :: option(file:io_device()),
5758
index_size :: pos_integer(),
5859
access_pattern :: sequential | random,
59-
file_advise = normal :: posix_file_advise(),
60+
file_advise = normal :: posix_file_advise(),
6061
mode = append :: read | append,
6162
compute_checksums = true :: boolean()}).
6263

@@ -453,6 +454,39 @@ segref(Filename) ->
453454
close(Seg),
454455
SegRef.
455456

457+
-spec info(file:filename_all()) ->
458+
#{size => non_neg_integer(),
459+
max_count => non_neg_integer(),
460+
file_type => regular | symlink,
461+
ctime => integer(),
462+
links => non_neg_integer(),
463+
num_entries => non_neg_integer(),
464+
ref => option(ra_log:segment_ref()),
465+
indexes => ra_seq:state()
466+
}.
467+
info(Filename)
468+
when not is_tuple(Filename) ->
469+
%% TODO: this can be much optimised by a specialised index parsing
470+
%% function
471+
{ok, Seg} = open(Filename, #{mode => read}),
472+
Index = Seg#state.index,
473+
{ok, #file_info{type = T,
474+
links = Links,
475+
ctime = CTime}} = file:read_link_info(Filename,
476+
[raw, {time, posix}]),
477+
478+
Info = #{size => Seg#state.data_write_offset,
479+
file_type => T,
480+
links => Links,
481+
ctime => CTime,
482+
max_count => max_count(Seg),
483+
num_entries => maps:size(Index),
484+
ref => segref(Seg),
485+
indexes => ra_seq:from_list(maps:keys(Index))
486+
},
487+
close(Seg),
488+
Info.
489+
456490
-spec is_same_as(state(), file:filename_all()) -> boolean().
457491
is_same_as(#state{cfg = #cfg{filename = Fn0}}, Fn) ->
458492
is_same_filename_all(Fn0, Fn).

0 commit comments

Comments
 (0)