Skip to content

Commit 611ab22

Browse files
committed
compaction refactoring
1 parent b726b16 commit 611ab22

File tree

3 files changed

+282
-102
lines changed

3 files changed

+282
-102
lines changed

src/ra_lib.erl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
lists_shuffle/1,
5050
is_dir/1,
5151
is_file/1,
52+
is_any_file/1,
5253
ensure_dir/1,
5354
consult/1,
5455
cons/2
@@ -466,6 +467,14 @@ is_file(File) ->
466467
false
467468
end.
468469

470+
is_any_file(File) ->
471+
case prim_file:read_file_info(File) of
472+
{ok, #file_info{}} ->
473+
true;
474+
_ ->
475+
false
476+
end.
477+
469478

470479
-spec consult(file:filename()) ->
471480
{ok, term()} | {error, term()}.

src/ra_log_segments.erl

Lines changed: 94 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,14 @@ decr_counter(#cfg{counter = undefined}, _, _) ->
573573
segment_files(Dir, Fun) ->
574574
list_files(Dir, ".segment", Fun).
575575

576+
% delete_files(Dir, Ext) ->
577+
% case list_files(Dir, Ext, fun (_) -> true end) of
578+
% [] ->
579+
% ok;
580+
% Files ->
581+
% [prim_file:delete(filename:join(Dir, F)) || F <- Files]
582+
% end.
583+
576584
list_files(Dir, Ext) ->
577585
list_files(Dir, Ext, fun (_) -> true end).
578586

@@ -622,10 +630,13 @@ major_compaction(#{dir := Dir} = CompConf, SegRefs, LiveIndexes) ->
622630
CompConf),
623631
Compacted0 =
624632
[begin
633+
%% create a compaction marker with the compaction group i
634+
CompactionMarker = filename:join(Dir, with_ext(CompGroupLeaderFn,
635+
".compaction_group")),
636+
ok = ra_lib:write_file(CompactionMarker, term_to_binary(All)),
625637
%% create a new segment with .compacting extension
626-
AllShortFns = [F || {_, _, {F, _}} <- All],
627-
CompactingShortFn = make_compacting_file_name(AllShortFns),
628-
CompactingFn = filename:join(Dir, CompactingShortFn),
638+
CompactingFn = filename:join(Dir, with_ext(CompGroupLeaderFn,
639+
".compacting")),
629640
%% max_count is the sum of all live indexes for segments in the
630641
%% compaction group
631642
MaxCount = lists:sum([ra_seq:length(S) || {_, S, _} <- All]),
@@ -641,28 +652,24 @@ major_compaction(#{dir := Dir} = CompConf, SegRefs, LiveIndexes) ->
641652
end, CompSeg0, All),
642653
ok = ra_log_segment:close(CompSeg),
643654

644-
%% link .compacting segment to the original .segment file
645-
%% first we have to create a hard link to a new .compacted file
646-
%% from the .compacting file (as we need to keep this as a marker
647-
%% until the end
648-
%% then we can rename this on top of the first segment file in the
649-
%% group (the target)
650-
FirstSegmentFn = filename:join(Dir, FstFn0),
651-
CompactedFn = filename:join(Dir, with_ext(FstFn0, ".compacted")),
652-
ok = prim_file:make_link(CompactingFn, CompactedFn),
653-
ok = prim_file:rename(CompactedFn, FirstSegmentFn),
655+
FirstSegmentFn = filename:join(Dir, CompGroupLeaderFn),
654656

655657
%% perform sym linking of the additional segments in the compaction
656-
%% group
657-
ok = make_links(Dir, FirstSegmentFn,
658-
[F || {_, _, {F, _}} <- Additional]),
659-
%% finally deleted the .compacting file to signal compaction group
660-
%% is complete
661-
ok = prim_file:delete(CompactingFn),
658+
%% group, the target is not yet updated which can be detected at
659+
%% recovery by the presence of a sym link _and_ the .compacting
660+
%% file
661+
ok = make_symlinks(Dir, FirstSegmentFn,
662+
[F || {_, _, {F, _}} <- Additional]),
663+
664+
%% rename the .compacting segment on top of the group leader
665+
ok = prim_file:rename(CompactingFn, FirstSegmentFn),
666+
%% finally delete the .compaction_marker file to signal
667+
%% compaction group is complete
668+
ok = prim_file:delete(CompactionMarker),
662669
%% return the new segref and additional segment keys
663670
{ra_log_segment:segref(FirstSegmentFn),
664671
[A || {_, _, {A, _}} <- Additional]}
665-
end || [{_Info, _, {FstFn0, _}} | Additional] = All
672+
end || [{_Info, _, {CompGroupLeaderFn, _}} | Additional] = All
666673
<- CompactionGroups],
667674

668675
{Compacted, AddDelete} = lists:unzip(Compacted0),
@@ -705,7 +712,7 @@ compactable_segrefs(SnapIdx, State) ->
705712
end, [], Compactable)
706713
end.
707714

708-
make_links(Dir, To, From)
715+
make_symlinks(Dir, To, From)
709716
when is_list(From) ->
710717
[begin
711718
SymFn = filename:join(Dir, with_ext(FromFn, ".link")),
@@ -769,55 +776,77 @@ take_group([{#{num_entries := NumEnts,
769776
{lists:reverse(Acc), Rem}
770777
end.
771778

772-
773-
parse_compacting_filename(Fn) when is_binary(Fn) ->
774-
binary:split(filename:rootname(Fn), <<"-">>, [global]).
775-
776-
make_compacting_file_name([N1 | Names]) ->
777-
Root = lists:foldl(fun (N, Acc) ->
778-
[filename:rootname(N), <<"-">> | Acc]
779-
end, [N1], Names),
780-
iolist_to_binary(lists:reverse([<<".compacting">> | Root])).
781-
782779
recover_compaction(Dir) ->
783-
case list_files(Dir, ".compacting") of
780+
case list_files(Dir, ".compaction_group") of
784781
[] ->
785782
%% no pending compactions
786783
#compaction_result{};
787-
[ShortFn] ->
788-
%% compaction recovery is needed
789-
CompactingFn = filename:join(Dir, ShortFn),
790-
{ok, #file_info{links = Links}} =
791-
file:read_link_info(CompactingFn, [raw, {time, posix}]),
792-
case Links of
793-
1 ->
794-
%% must have exited before the target file was renamed
795-
%% just delete
796-
ok = prim_file:delete(CompactingFn),
784+
[CompactionGroupFn0] ->
785+
%% compaction recovery is needed as there is a .compaction_group file
786+
CompactionGroupFn = filename:join(Dir, CompactionGroupFn0),
787+
%% if corrupt, just delete .compaction_group file
788+
{ok, Bin} = prim_file:read_file(CompactionGroupFn),
789+
CompactionGroup = try binary_to_term(Bin) of
790+
Group ->
791+
Group
792+
catch _:_ ->
793+
%% any error just return empty
794+
_ = prim_file:delete(CompactionGroupFn),
795+
[]
796+
end,
797+
798+
%% there _may_ be a .compacting file
799+
CompactingFn = filename:join(Dir, with_ext(CompactionGroupFn0,
800+
".compacting")),
801+
802+
case CompactionGroup of
803+
[] ->
804+
#compaction_result{};
805+
[_] ->
806+
%% single segment compaction, we cannot know if the
807+
%% compaction into the compacting segment completed or
808+
%% not
809+
%% ignore return value as CompactingFn may not exist
810+
_ = prim_file:delete(CompactingFn),
811+
ok = prim_file:delete(CompactionGroupFn),
797812
#compaction_result{};
798-
2 ->
799-
[FstFn | RemFns] = parse_compacting_filename(ShortFn),
800-
%% there may be a .compacted file
801-
Target = filename:join(Dir, with_ext(FstFn, ".segment")),
802-
case list_files(Dir, ".compacted") of
803-
[CompactedShortFn] ->
804-
CompactedFn = filename:join(Dir, CompactedShortFn),
805-
%% all entries were copied but it failed before
806-
%% this hard link could be renamed over the target
807-
ok = prim_file:rename(CompactedFn, Target),
808-
ok;
809-
[] ->
810-
%% links may not have been fully created,
811-
%% delete all .link files then relink
812-
ok
813-
end,
814-
ok = make_links(Dir, Target, RemFns),
815-
ok = prim_file:delete(CompactingFn),
816-
817-
Linked = [with_ext(L, ".segment") || L <- RemFns],
818-
Compacted = [ra_log_segment:segref(Target)],
819-
#compaction_result{compacted = Compacted,
820-
linked = Linked}
813+
[TargetShortFn | [FstLinkSeg | _] = LinkTargets] ->
814+
%% multiple segments in group,
815+
%% if any of the additional segments is a symlink
816+
%% the writes to the .compacting segment completed and we
817+
%% can complete the compaction work
818+
FstLinkSegFn = filename:join(Dir, FstLinkSeg),
819+
FstLinkSegLinkFn = filename:join(Dir, with_ext(FstLinkSeg, ".link")),
820+
Target = filename:join(Dir, TargetShortFn),
821+
AtLeastOneLink = ra_lib:is_any_file(FstLinkSegLinkFn),
822+
CompactingExists = ra_lib:is_any_file(CompactingFn),
823+
case file:read_link_info(FstLinkSegFn, [raw]) of
824+
{ok, #file_info{type = Type}}
825+
when Type == symlink orelse
826+
AtLeastOneLink ->
827+
%% it is a symlink, recreate all symlinks and delete
828+
%% compaction marker
829+
ok = make_symlinks(Dir, Target, LinkTargets),
830+
%% if compacting file exists, rename it to target
831+
if CompactingExists ->
832+
ok = prim_file:rename(CompactingFn, Target);
833+
true ->
834+
ok
835+
end,
836+
ok = prim_file:delete(CompactionGroupFn),
837+
Compacted = [ra_log_segment:segref(Target)],
838+
#compaction_result{compacted = Compacted,
839+
linked = LinkTargets};
840+
{error, enoent} ->
841+
%% segment does not exist indicates what exactly?
842+
_ = prim_file:delete(CompactingFn),
843+
ok = prim_file:delete(CompactionGroupFn),
844+
#compaction_result{};
845+
{ok, #file_info{type = regular}} ->
846+
_ = prim_file:delete(CompactingFn),
847+
ok = prim_file:delete(CompactionGroupFn),
848+
#compaction_result{}
849+
end
821850
end
822851
end.
823852

0 commit comments

Comments
 (0)