Skip to content

Commit

Permalink
Merge pull request #4 from martinsumner/develop-3.1
Browse files Browse the repository at this point in the history
Mas d31 i410looptoclose (martinsumner#421)
  • Loading branch information
martinsumner authored Nov 13, 2023
2 parents f617dd8 + 6223b80 commit 39f34b4
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 27 deletions.
31 changes: 31 additions & 0 deletions src/leveled_bookie.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2614,6 +2614,37 @@ generate_multiple_objects(Count, KeyNumber, ObjL) ->
ObjL ++ [{Key, Value, IndexSpec}]).


shutdown_test_() ->
{timeout, 10, fun shutdown_tester/0}.

shutdown_tester() ->
RootPath = reset_filestructure(),
{ok, Bookie1} = book_start([{root_path, RootPath}]),
lists:foreach(
fun({K, V, S}) ->
ok = book_put(Bookie1, <<"Bucket">>, K, V, S, ?STD_TAG)
end,
generate_multiple_objects(5000, 1)),
{ok, SnpPCL1, SnpJrnl1} =
leveled_bookie:book_snapshot(Bookie1, store, undefined, true),

TestPid = self(),
spawn(
fun() ->
ok = leveled_bookie:book_close(Bookie1),
TestPid ! ok
end),

timer:sleep(2000),
ok = leveled_penciller:pcl_close(SnpPCL1),
ok = leveled_inker:ink_close(SnpJrnl1),
SW = os:timestamp(),
receive ok -> ok end,
WaitForShutDown = timer:now_diff(SW, os:timestamp()) div 1000,
?assert(WaitForShutDown =< (1000 + 1)),
_ = reset_filestructure().


ttl_test() ->
RootPath = reset_filestructure(),
{ok, Bookie1} = book_start([{root_path, RootPath}]),
Expand Down
25 changes: 18 additions & 7 deletions src/leveled_inker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
-define(JOURNAL_FILEX, "cdb").
-define(PENDING_FILEX, "pnd").
-define(TEST_KC, {[], infinity}).
-define(SHUTDOWN_LOOPS, 10).
-define(SHUTDOWN_PAUSE, 10000).
% How long to wait for snapshots to be released on shutdown
% before forcing closure of snapshots
Expand All @@ -154,7 +155,8 @@
compression_method = native :: lz4|native|none,
compress_on_receipt = false :: boolean(),
snap_timeout :: pos_integer() | undefined, % in seconds
source_inker :: pid() | undefined}).
source_inker :: pid() | undefined,
shutdown_loops = ?SHUTDOWN_LOOPS :: non_neg_integer()}).


-type inker_options() :: #inker_options{}.
Expand Down Expand Up @@ -786,16 +788,25 @@ handle_cast({remove_logs, ForcedLogs}, State) ->
handle_cast({maybe_defer_shutdown, ShutdownType, From}, State) ->
case length(State#state.registered_snapshots) of
0 ->
ok;
gen_server:cast(self(), {complete_shutdown, ShutdownType, From}),
{noreply, State};
N ->
% Whilst this process sleeps, then any remaining snapshots may
% release and have their release messages queued before the
% complete_shutdown cast is sent
leveled_log:log(i0026, [N]),
timer:sleep(?SHUTDOWN_PAUSE)
end,
gen_server:cast(self(), {complete_shutdown, ShutdownType, From}),
{noreply, State};
case State#state.shutdown_loops of
LoopCount when LoopCount > 0 ->
leveled_log:log(i0026, [N]),
timer:sleep(?SHUTDOWN_PAUSE div ?SHUTDOWN_LOOPS),
gen_server:cast(
self(), {maybe_defer_shutdown, ShutdownType, From}),
{noreply, State#state{shutdown_loops = LoopCount - 1}};
0 ->
gen_server:cast(
self(), {complete_shutdown, ShutdownType, From}),
{noreply, State}
end
end;
handle_cast({complete_shutdown, ShutdownType, From}, State) ->
lists:foreach(
fun(SnapPid) -> ok = ink_snapclose(SnapPid) end,
Expand Down
27 changes: 20 additions & 7 deletions src/leveled_penciller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@
-define(ITERATOR_SCANWIDTH, 4).
-define(TIMING_SAMPLECOUNTDOWN, 10000).
-define(TIMING_SAMPLESIZE, 100).
-define(SHUTDOWN_LOOPS, 10).
-define(SHUTDOWN_PAUSE, 10000).
% How long to wait for snapshots to be released on shutdown
% before forcing closure of snapshots
Expand Down Expand Up @@ -270,7 +271,10 @@

monitor = {no_monitor, 0} :: leveled_monitor:monitor(),

sst_options = #sst_options{} :: sst_options()}).
sst_options = #sst_options{} :: sst_options(),

shutdown_loops = ?SHUTDOWN_LOOPS :: non_neg_integer()
}).


-type penciller_options() :: #penciller_options{}.
Expand Down Expand Up @@ -1153,16 +1157,25 @@ handle_cast({remove_logs, ForcedLogs}, State) ->
handle_cast({maybe_defer_shutdown, ShutdownType, From}, State) ->
case length(leveled_pmanifest:snapshot_pids(State#state.manifest)) of
0 ->
ok;
gen_server:cast(self(), {complete_shutdown, ShutdownType, From}),
{noreply, State};
N ->
% Whilst this process sleeps, then any remaining snapshots may
% release and have their release messages queued before the
% complete_shutdown cast is sent
leveled_log:log(p0042, [N]),
timer:sleep(?SHUTDOWN_PAUSE)
end,
gen_server:cast(self(), {complete_shutdown, ShutdownType, From}),
{noreply, State};
case State#state.shutdown_loops of
LoopCount when LoopCount > 0 ->
leveled_log:log(p0042, [N]),
timer:sleep(?SHUTDOWN_PAUSE div ?SHUTDOWN_LOOPS),
gen_server:cast(
self(), {maybe_defer_shutdown, ShutdownType, From}),
{noreply, State#state{shutdown_loops = LoopCount - 1}};
0 ->
gen_server:cast(
self(), {complete_shutdown, ShutdownType, From}),
{noreply, State}
end
end;
handle_cast({complete_shutdown, ShutdownType, From}, State) ->
lists:foreach(
fun(Snap) -> ok = pcl_snapclose(Snap) end,
Expand Down
2 changes: 1 addition & 1 deletion test/end_to_end/basic_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ space_clear_ondelete(_Config) ->
true = PointB_Journals < length(FNsA_J),
true = length(strip_nonsst(FNsD_L)) < length(strip_nonsst(FNsA_L)),
true = length(strip_nonsst(FNsD_L)) < length(strip_nonsst(FNsB_L)),
true = length(strip_nonsst(FNsD_L)) < length(strip_nonsst(FNsC_L)),
true = length(strip_nonsst(FNsD_L)) =< length(strip_nonsst(FNsC_L)),
true = length(strip_nonsst(FNsD_L)) == 0.


Expand Down
30 changes: 18 additions & 12 deletions test/end_to_end/iterator_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@ expiring_indexes(_Config) ->

SW1 = os:timestamp(),
IBKL1 = testutil:stdload_expiring(Bookie1, KeyCount, Future),
timer:sleep(1000),
% Wait a second after last key so that none loaded in the last second
LoadTime = timer:now_diff(os:timestamp(), SW1)/1000000,
io:format("Load of ~w std objects in ~w seconds~n", [KeyCount, LoadTime]),
SW2 = os:timestamp(),

FilterFun = fun({I, _B, _K}) -> lists:member(I, [5, 6, 7, 8]) end,
LoadedEntriesInRange = lists:sort(lists:filter(FilterFun, IBKL1)),
Expand Down Expand Up @@ -101,6 +104,7 @@ expiring_indexes(_Config) ->
% this time index value of 6
testutil:stdload_object(
Bookie1, B0, K0, 5, <<"value">>, leveled_util:integer_now() + 10),
timer:sleep(1000),
{async, Folder2} = IndexFold(),
leveled_bookie:book_indexfold(Bookie1,
B0,
Expand All @@ -121,23 +125,25 @@ expiring_indexes(_Config) ->

FoldTime = timer:now_diff(os:timestamp(), SW1)/1000000 - LoadTime,
io:format("Query returned ~w entries in ~w seconds - 3 queries + 10s wait~n",
[length(QR1), FoldTime]),
true = (LoadTime + FoldTime) < Future,
SleepTime = round((Future - (LoadTime + FoldTime)) * 1000),
io:format("Sleeping ~w s for all to expire~n", [SleepTime/1000]),
timer:sleep(SleepTime + 1000), % add a second
[length(QR3), FoldTime]),

SleepTime =
(Future - (timer:now_diff(os:timestamp(), SW2) div (1000 * 1000))) + 1,

io:format("Sleeping ~w s for all to expire~n", [SleepTime]),
timer:sleep(SleepTime * 1000),

% Index entries should now have expired
{async, Folder4} = IndexFold(),
QR4 = Folder4(),
io:format("Unexpired indexes of length ~w~n", [length(QR4)]),
lists:foreach(
fun(I) ->
io:format("Unexpired index ~p~n", [I])
end,
QR4
),
io:format("QR4 Unexpired indexes of length ~w~n", [length(QR4)]),
timer:sleep(1000),
{async, Folder5} = IndexFold(),
QR5 = Folder5(),
io:format("QR5 Unexpired indexes of length ~w~n", [length(QR5)]),

true = QR4 == [],
true = QR5 == [],

ok = leveled_bookie:book_close(Bookie1),
testutil:reset_filestructure().
Expand Down

0 comments on commit 39f34b4

Please sign in to comment.