Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close in stages - waiting for releases #411

Merged
merged 4 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 54 additions & 32 deletions src/leveled_inker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@
-define(JOURNAL_FILEX, "cdb").
-define(PENDING_FILEX, "pnd").
-define(TEST_KC, {[], infinity}).
-define(SHUTDOWN_PAUSE, 10000).
% How long to wait for snapshots to be released on shutdown
% before forcing closure of snapshots
% 10s may not be long enough for all snapshots, but avoids crashes of
% short-lived queries racing with the shutdown

-record(state, {manifest = [] :: list(),
manifest_sqn = 0 :: integer(),
Expand Down Expand Up @@ -666,33 +671,23 @@ handle_call({check_sqn, LedgerSQN}, _From, State) ->
end;
handle_call(get_journalsqn, _From, State) ->
{reply, {ok, State#state.journal_sqn}, State};
handle_call(close, _From, State) ->
case State#state.is_snapshot of
true ->
ok = ink_releasesnapshot(State#state.source_inker, self());
false ->
leveled_log:log(i0005, [close]),
leveled_log:log(
i0006, [State#state.journal_sqn, State#state.manifest_sqn]),
ok = leveled_iclerk:clerk_stop(State#state.clerk),
shutdown_snapshots(State#state.registered_snapshots),
shutdown_manifest(State#state.manifest)
end,
handle_call(close, _From, State=#state{is_snapshot=Snap}) when Snap == true ->
ok = ink_releasesnapshot(State#state.source_inker, self()),
{stop, normal, ok, State};
handle_call(doom, _From, State) ->
FPs = [filepath(State#state.root_path, journal_dir),
filepath(State#state.root_path, manifest_dir),
filepath(State#state.root_path, journal_compact_dir),
filepath(State#state.root_path, journal_waste_dir)],
leveled_log:log(i0018, []),

leveled_log:log(i0005, [doom]),
handle_call(ShutdownType, From, State)
when ShutdownType == close; ShutdownType == doom ->
case ShutdownType of
doom ->
leveled_log:log(i0018, []);
_ ->
ok
end,
leveled_log:log(i0005, [ShutdownType]),
leveled_log:log(
i0006, [State#state.journal_sqn, State#state.manifest_sqn]),
ok = leveled_iclerk:clerk_stop(State#state.clerk),
shutdown_snapshots(State#state.registered_snapshots),
shutdown_manifest(State#state.manifest),
{stop, normal, {ok, FPs}, State}.
gen_server:cast(self(), {maybe_defer_shutdown, ShutdownType, From}),
{noreply, State}.


handle_cast({clerk_complete, ManifestSnippet, FilesToDelete}, State) ->
Expand Down Expand Up @@ -778,8 +773,41 @@ handle_cast({remove_logs, ForcedLogs}, State) ->
ok = leveled_log:remove_forcedlogs(ForcedLogs),
CDBopts = State#state.cdb_options,
CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()},
{noreply, State#state{cdb_options = CDBopts0}}.

{noreply, State#state{cdb_options = CDBopts0}};
handle_cast({maybe_defer_shutdown, ShutdownType, From}, State) ->
case length(State#state.registered_snapshots) of
0 ->
ok;
N ->
% Whilst this process sleeps, then any remaining snapshots may
% release and have their release messages queued before the
% complete_shutdown cast is sent
leveled_log:log(i0026, [N]),
timer:sleep(?SHUTDOWN_PAUSE)
end,
gen_server:cast(self(), {complete_shutdown, ShutdownType, From}),
{noreply, State};
handle_cast({complete_shutdown, ShutdownType, From}, State) ->
lists:foreach(
fun(SnapPid) -> ok = ink_close(SnapPid) end,
lists:filter(
martinsumner marked this conversation as resolved.
Show resolved Hide resolved
fun is_process_alive/1,
lists:map(
fun(Snapshot) -> element(1, Snapshot) end,
State#state.registered_snapshots))),
shutdown_manifest(State#state.manifest),
case ShutdownType of
doom ->
FPs =
[filepath(State#state.root_path, journal_dir),
filepath(State#state.root_path, manifest_dir),
filepath(State#state.root_path, journal_compact_dir),
filepath(State#state.root_path, journal_waste_dir)],
gen_server:reply(From, {ok, FPs});
close ->
gen_server:reply(From, ok)
end,
{stop, normal, State}.

%% handle the bookie stopping and stop this snapshot
handle_info({'DOWN', BookieMonRef, process, _BookiePid, _Info},
Expand All @@ -801,6 +829,7 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%============================================================================


-spec start_from_file(inker_options()) -> {ok, ink_state()}.
%% @doc
%% Start an Inker from the state on disk (i.e. not a snapshot).
Expand Down Expand Up @@ -866,13 +895,6 @@ start_from_file(InkOpts) ->
clerk = Clerk}}.


-spec shutdown_snapshots(list(registered_snapshot())) -> ok.
%% @doc
%% Shutdown any snapshots before closing the store
shutdown_snapshots(Snapshots) ->
lists:foreach(fun({Snap, _TS, _SQN}) -> ok = ink_close(Snap) end,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This original code failed when a snapshot already was closed or did respond with something else than ok.
However, in a cleanup, what does it give you when you crash the gen_server loop because you cannot cleanup?

Snapshots).

-spec shutdown_manifest(leveled_imanifest:manifest()) -> ok.
%% @doc
%% Shutdown all files in the manifest
Expand Down
4 changes: 4 additions & 0 deletions src/leveled_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@
{info, <<"Archiving filename ~s as unused at startup">>},
p0041 =>
{info, <<"Penciller manifest switched from SQN ~w to ~w">>},
p0042 =>
{info, <<"Deferring shutdown due to snapshot_count=~w">>},
pc001 =>
{info, <<"Penciller's clerk ~w started with owner ~w">>},
pc005 =>
Expand Down Expand Up @@ -244,6 +246,8 @@
{info, <<"Prompted roll at NewSQN=~w">>},
i0025 =>
{warn, <<"Journal SQN of ~w is below Ledger SQN of ~w anti-entropy will be required">>},
i0026 =>
{info, <<"Deferring shutdown due to snapshot_count=~w">>},
ic001 =>
{info, <<"Closed for reason ~w so maybe leaving garbage">>},
ic002 =>
Expand Down
64 changes: 47 additions & 17 deletions src/leveled_penciller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@
-define(TIMING_SAMPLECOUNTDOWN, 10000).
-define(TIMING_SAMPLESIZE, 100).
-define(OPEN_LASTMOD_RANGE, {0, infinity}).
-define(SHUTDOWN_PAUSE, 10000).
% How long to wait for snapshots to be released on shutdown
% before forcing closure of snapshots
% 10s may not be long enough for all snapshots, but avoids crashes of
% short-lived queries racing with the shutdown

-record(state, {manifest ::
leveled_pmanifest:manifest() | undefined | redacted,
Expand Down Expand Up @@ -566,15 +571,15 @@ pcl_persistedsqn(Pid) ->
%% @doc
%% Close the penciller neatly, trying to persist to disk anything in the memory
pcl_close(Pid) ->
gen_server:call(Pid, close, 60000).
gen_server:call(Pid, close, infinity).

-spec pcl_doom(pid()) -> {ok, list()}.
%% @doc
%% Close the penciller neatly, trying to persist to disk anything in the memory
%% Return a list of filepaths from where files exist for this penciller (should
%% the calling process which to erase the store).
pcl_doom(Pid) ->
gen_server:call(Pid, doom, 60000).
gen_server:call(Pid, doom, infinity).

-spec pcl_checkbloomtest(pid(), tuple()) -> boolean().
%% @doc
Expand Down Expand Up @@ -906,7 +911,7 @@ handle_call({register_snapshot, Snapshot, Query, BookiesMem, LongRunning},
handle_call(close, _From, State=#state{is_snapshot=Snap}) when Snap == true ->
ok = pcl_releasesnapshot(State#state.source_penciller, self()),
{stop, normal, ok, State};
handle_call(close, _From, State) ->
handle_call(close, From, State) ->
% Level 0 files lie outside of the manifest, and so if there is no L0
% file present it is safe to write the current contents of memory. If
% there is a L0 file present - then the memory can be dropped (it is
Expand Down Expand Up @@ -935,17 +940,13 @@ handle_call(close, _From, State) ->
false ->
leveled_log:log(p0010, [State#state.levelzero_size])
end,
shutdown_manifest(State#state.manifest, State#state.levelzero_constructor),
{stop, normal, ok, State};
handle_call(doom, _From, State) ->
gen_server:cast(self(), {maybe_defer_shutdown, close, From}),
{noreply, State};
handle_call(doom, From, State) ->
leveled_log:log(p0030, []),
ok = leveled_pclerk:clerk_close(State#state.clerk),

shutdown_manifest(State#state.manifest, State#state.levelzero_constructor),

ManifestFP = State#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/",
FilesFP = State#state.root_path ++ "/" ++ ?FILES_FP ++ "/",
{stop, normal, {ok, [ManifestFP, FilesFP]}, State};
gen_server:cast(self(), {maybe_defer_shutdown, doom, From}),
{noreply, State};
handle_call({checkbloom_fortest, Key, Hash}, _From, State) ->
Manifest = State#state.manifest,
FoldFun =
Expand Down Expand Up @@ -995,8 +996,8 @@ handle_cast({manifest_change, Manifest}, State) ->
work_ongoing=false}}
end;
handle_cast({release_snapshot, Snapshot}, State) ->
Manifest0 = leveled_pmanifest:release_snapshot(State#state.manifest,
Snapshot),
Manifest0 =
leveled_pmanifest:release_snapshot(State#state.manifest, Snapshot),
leveled_log:log(p0003, [Snapshot]),
{noreply, State#state{manifest=Manifest0}};
handle_cast({confirm_delete, PDFN, FilePid}, State=#state{is_snapshot=Snap})
Expand Down Expand Up @@ -1156,7 +1157,36 @@ handle_cast({remove_logs, ForcedLogs}, State) ->
ok = leveled_log:remove_forcedlogs(ForcedLogs),
SSTopts = State#state.sst_options,
SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()},
{noreply, State#state{sst_options = SSTopts0}}.
{noreply, State#state{sst_options = SSTopts0}};
handle_cast({maybe_defer_shutdown, ShutdownType, From}, State) ->
case length(leveled_pmanifest:snapshot_pids(State#state.manifest)) of
0 ->
ok;
N ->
% Whilst this process sleeps, then any remaining snapshots may
% release and have their release messages queued before the
% complete_shutdown cast is sent
leveled_log:log(p0042, [N]),
timer:sleep(?SHUTDOWN_PAUSE)
end,
gen_server:cast(self(), {complete_shutdown, ShutdownType, From}),
{noreply, State};
handle_cast({complete_shutdown, ShutdownType, From}, State) ->
lists:foreach(
fun(Snap) -> ok = pcl_close(Snap) end,
martinsumner marked this conversation as resolved.
Show resolved Hide resolved
lists:filter(
fun is_process_alive/1,
leveled_pmanifest:snapshot_pids(State#state.manifest))),
shutdown_manifest(State#state.manifest, State#state.levelzero_constructor),
case ShutdownType of
doom ->
ManifestFP = State#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/",
FilesFP = State#state.root_path ++ "/" ++ ?FILES_FP ++ "/",
gen_server:reply(From, {ok, [ManifestFP, FilesFP]});
close ->
gen_server:reply(From, ok)
end,
{stop, normal, State}.


%% handle the bookie stopping and stop this snapshot
Expand Down Expand Up @@ -1195,8 +1225,8 @@ sst_rootpath(RootPath) ->
FP.

sst_filename(ManSQN, Level, Count) ->
lists:flatten(io_lib:format("./~w_~w_~w" ++ ?SST_FILEX,
[ManSQN, Level, Count])).
lists:flatten(
io_lib:format("./~w_~w_~w" ++ ?SST_FILEX, [ManSQN, Level, Count])).


%%%============================================================================
Expand Down
45 changes: 23 additions & 22 deletions src/leveled_pmanifest.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
is_basement/2,
levelzero_present/1,
check_bloom/3,
report_manifest_level/2
report_manifest_level/2,
snapshot_pids/1
]).

-export([
Expand Down Expand Up @@ -88,22 +89,23 @@

-record(manifest, {levels,
% an array of lists or trees representing the manifest
manifest_sqn = 0 :: integer(),
manifest_sqn = 0 :: non_neg_integer(),
% The current manifest SQN
snapshots :: list() | undefined,
snapshots = []
:: list(snapshot()),
% A list of snaphots (i.e. clones)
min_snapshot_sqn = 0 :: integer(),
% The smallest snapshot manifest SQN in the snapshot
% list
pending_deletes, % OTP16 does not like defining type
% a dictionary mapping keys (filenames) to SQN when the
% deletion was made, and the original Manifest Entry
basement :: integer(),
pending_deletes = dict:new() :: dict:dict(),
basement :: non_neg_integer(),
% Currently the lowest level (the largest number)
blooms :: any() % actually a dict but OTP 16 compatability
% A dictionary mapping PIDs to bloom filters
blooms :: dict:dict()
}).

-type fake_pid() :: pid_a1|pid_a2|pid_a3|pid_a4.
martinsumner marked this conversation as resolved.
Show resolved Hide resolved
-type snapshot() ::
{pid()|fake_pid(), non_neg_integer(), pos_integer(), pos_integer()}.
-type manifest() :: #manifest{}.
-type manifest_entry() :: #manifest_entry{}.
-type manifest_owner() :: pid()|list().
Expand Down Expand Up @@ -171,8 +173,8 @@ open_manifest(RootPath) ->
%% by a snapshot
copy_manifest(Manifest) ->
% Copy the manifest ensuring anything only the master process should care
% about is switched to undefined
Manifest#manifest{snapshots = undefined, pending_deletes = undefined}.
% about is switched to be empty
Manifest#manifest{snapshots = [], pending_deletes = dict:new()}.

-spec load_manifest(
manifest(),
Expand Down Expand Up @@ -538,10 +540,9 @@ mergefile_selector(Manifest, LevelIdx, {grooming, ScoringFun}) ->
%% be received in parallel to the manifest ebing updated, so the updated
%% manifest must not trample over any accrued state in the manifest.
merge_snapshot(PencillerManifest, ClerkManifest) ->
ClerkManifest#manifest{snapshots =
PencillerManifest#manifest.snapshots,
min_snapshot_sqn =
PencillerManifest#manifest.min_snapshot_sqn}.
ClerkManifest#manifest{
snapshots = PencillerManifest#manifest.snapshots,
martinsumner marked this conversation as resolved.
Show resolved Hide resolved
min_snapshot_sqn = PencillerManifest#manifest.min_snapshot_sqn}.

-spec add_snapshot(manifest(), pid()|atom(), integer()) -> manifest().
%% @doc
Expand Down Expand Up @@ -695,6 +696,11 @@ check_bloom(Manifest, FP, Hash) ->
true
end.

-spec snapshot_pids(manifest()) -> list(pid()).
%% @doc
%% Return a list of snapshot_pids - to be shutdown on shutdown
snapshot_pids(Manifest) ->
lists:map(fun(S) -> element(1, S) end, Manifest#manifest.snapshots).

%%%============================================================================
%%% Internal Functions
Expand Down Expand Up @@ -1442,7 +1448,7 @@ snapshot_timeout_test() ->
?assertMatch(1, length(Man7#manifest.snapshots)),
Man8 = release_snapshot(Man7, pid_a1),
?assertMatch(0, length(Man8#manifest.snapshots)),
Man9 = add_snapshot(Man8, pid_a1, 0),
Man9 = add_snapshot(Man8, pid_a1, 1),
timer:sleep(2001),
?assertMatch(1, length(Man9#manifest.snapshots)),
Man10 = release_snapshot(Man9, ?PHANTOM_PID),
Expand Down Expand Up @@ -1474,12 +1480,7 @@ potential_issue_test() ->
{idxt,0,{{},{0,nil}}},
{idxt,0,{{},{0,nil}}},
[]}},
19,[],0,
{dict,0,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},
2,
dict:new()},
19, [], 0, dict:new(), 2, dict:new()},
Range1 = range_lookup(Manifest,
1,
{o_rkv, "Bucket", null, null},
Expand Down
Loading