Skip to content

Commit f32f49e

Browse files
acogoluegneskjnilsson
authored andcommitted
Use actual filter size in read ahead
Start with default filter size and use actual size after non-zero value has been read. Attempt to read ahead from parse_header instead of returning the header and no data. The decision is based on the availability of previous read ahead data.
1 parent c49addd commit f32f49e

File tree

2 files changed

+124
-40
lines changed

2 files changed

+124
-40
lines changed

src/osiris_log.erl

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
make_chunk/7,
6767
orphaned_segments/1,
6868
read_header0/1,
69-
last_data_size/2,
69+
read_ahead_hints/3,
7070
update_read/4
7171
]).
7272

@@ -430,6 +430,7 @@
430430
position = 0 :: non_neg_integer(),
431431
filter :: undefined | osiris_bloom:mstate(),
432432
last_data_size = undefined :: undefined | non_neg_integer(),
433+
filter_size = ?DEFAULT_FILTER_SIZE :: osiris_bloom:filter_size(),
433434
read_ahead_data = undefined :: undefined | binary()}).
434435
-record(write,
435436
{type = writer :: writer | acceptor,
@@ -1665,7 +1666,8 @@ send_file(Sock, State) ->
16651666
send_file(Sock,
16661667
#?MODULE{mode = #read{type = RType,
16671668
chunk_selector = Selector,
1668-
transport = Transport}} = State0,
1669+
transport = Transport,
1670+
filter_size = RaFs}} = State0,
16691671
Callback) ->
16701672
case catch read_header0(State0) of
16711673
{ok, #{type := ChType,
@@ -1687,7 +1689,9 @@ send_file(Sock,
16871689
true ->
16881690
Read = Read0#read{next_offset = ChId + NumRecords,
16891691
position = NextPos,
1690-
last_data_size = DataSize},
1692+
last_data_size = DataSize,
1693+
filter_size = read_ahead_fsize(RaFs,
1694+
FilterSize)},
16911695
case MaybeData of
16921696
undefined ->
16931697
%% read header
@@ -2903,10 +2907,11 @@ read_header_with_ra(#?MODULE{cfg = #cfg{directory = Dir,
29032907
mode = #read{next_offset = NextChId0,
29042908
position = Pos,
29052909
last_data_size = Lds,
2910+
filter_size = FilterSize,
29062911
read_ahead_data = undefined} = Read0,
29072912
current_file = CurFile,
29082913
fd = Fd} = State) ->
2909-
ReadAheadSize = read_ahead_size(Lds),
2914+
ReadAheadSize = read_ahead_size(Lds, FilterSize),
29102915

29112916
case file:pread(Fd, Pos, ?HEADER_SIZE_B + ReadAheadSize) of
29122917
{ok, Bin} when byte_size(Bin) >= ?HEADER_SIZE_B ->
@@ -2981,7 +2986,6 @@ read_header_with_ra(#?MODULE{mode = #read{last_data_size = Lds,
29812986
read_header_with_ra(State#?MODULE{mode = Read1})
29822987
end.
29832988

2984-
29852989
parse_header(<<?MAGIC:4/unsigned,
29862990
?VERSION:4/unsigned,
29872991
ChType:8/unsigned,
@@ -2998,7 +3002,9 @@ parse_header(<<?MAGIC:4/unsigned,
29983002
MaybeFilterAndRest/binary>> = HeaderData0,
29993003
#?MODULE{mode = #read{type = RType,
30003004
chunk_selector = Selector,
3001-
next_offset = NextChId0} = Read0} = State) ->
3005+
next_offset = NextChId0,
3006+
read_ahead_data = RAD,
3007+
filter_size = LFS} = Read0} = State) ->
30023008
{ToSkip, ToSend} = select_amount_to_send(RType, Selector, ChType,
30033009
FilterSize, DataSize,
30043010
TrailerSize),
@@ -3010,8 +3016,8 @@ parse_header(<<?MAGIC:4/unsigned,
30103016
<<_Skip:ToSkip/binary,
30113017
Ctnt:ToSend/binary,
30123018
Rest/binary>>
3013-
when byte_size(Rest) > ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE ->
3014-
%% remained is larger than 64 bytes so worth keeping
3019+
when byte_size(Rest) > ?HEADER_SIZE_B + LFS ->
3020+
%% remaining is larger than 64 bytes so worth keeping
30153021
%% around
30163022
{Rest, Ctnt};
30173023
<<_Skip:ToSkip/binary,
@@ -3026,25 +3032,38 @@ parse_header(<<?MAGIC:4/unsigned,
30263032
Epoch, NextChId0, Crc, DataSize, TrailerSize,
30273033
FilterSize);
30283034
false ->
3029-
%% having to throw away the read ahead data here
3030-
Read1 = Read0#read{read_ahead_data = undefined},
3031-
maybe_return_header(State#?MODULE{mode = Read1},
3032-
HeaderData0, MaybeFilterAndRest, undefined,
3033-
ChType, NumEntries, NumRecords, Timestamp,
3034-
Epoch, NextChId0, Crc, DataSize, TrailerSize,
3035-
FilterSize)
3035+
case RAD of
3036+
undefined ->
3037+
%% we just read the data, we could not read ahead the whole chunk
3038+
%% let's move on to see whether the chunk should be filtered or not
3039+
maybe_return_header(State,
3040+
HeaderData0, MaybeFilterAndRest, undefined,
3041+
ChType, NumEntries, NumRecords, Timestamp,
3042+
Epoch, NextChId0, Crc, DataSize, TrailerSize,
3043+
FilterSize);
3044+
_ ->
3045+
%% the data were from a previous read
3046+
%% we can ditch them and try to read the chunk ahead
3047+
need_more_data
3048+
end
30363049
end;
30373050
parse_header(_, _) ->
30383051
need_more_data.
30393052

3040-
read_ahead_size(LastDataSize) ->
3053+
%% keep the previous value if the current one is 0 (i.e. no filter in the chunk)
3054+
read_ahead_fsize(Previous, 0) ->
3055+
Previous;
3056+
read_ahead_fsize(_, Current) ->
3057+
Current.
3058+
3059+
read_ahead_size(LastDataSize, FilterSize) ->
30413060
case LastDataSize =/= undefined andalso
30423061
LastDataSize =< ?READ_AHEAD_LIMIT of
30433062
true ->
30443063
%% the previous chunk was small, try to read
30453064
%% the next chunk fully in one read
30463065
%% this can save us a system call later
3047-
?DEFAULT_FILTER_SIZE + ?READ_AHEAD_LIMIT;
3066+
FilterSize + ?READ_AHEAD_LIMIT;
30483067
false ->
30493068
%% optimistically read the default filter size.
30503069
%% this amounts to 64 bytes with the header (small binary)
@@ -3107,10 +3126,14 @@ maybe_return_header(#?MODULE{cfg = #cfg{counter = CntRef},
31073126
read_header0(State#?MODULE{mode = Read})
31083127
end.
31093128

3110-
-spec last_data_size(state(), non_neg_integer()) -> state().
3111-
last_data_size(#?MODULE{mode = R = #read{}} = S, Lds) ->
3112-
S#?MODULE{mode = R#read{last_data_size = Lds}}.
3129+
%% for testing
3130+
-spec read_ahead_hints(state(), non_neg_integer(), osiris_bloom:filter_size()) ->
3131+
state().
3132+
read_ahead_hints(#?MODULE{mode = R = #read{}} = S, Lds, FilterSize) ->
3133+
S#?MODULE{mode = R#read{last_data_size = Lds,
3134+
filter_size = FilterSize}}.
31133135

3136+
%% for testing
31143137
-spec update_read(state(), offset(), offset(), non_neg_integer()) -> state().
31153138
update_read(#?MODULE{mode = R0 = #read{}} = S, ChId, NumRecords, Pos) ->
31163139
R = R0#read{next_offset = ChId + NumRecords,

test/osiris_log_SUITE.erl

Lines changed: 81 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1890,24 +1890,23 @@ read_header_ahead_offset_reader(Config) ->
18901890
%% the messages are large enough to be larger than the default
18911891
%% filter size which is always read ahead (16 bytes)
18921892
{_, W1} = write_committed([<<"hiiiiiiiii">>, <<"hooooooo">>], W0),
1893-
ct:pal("R0 ~p", [R0]),
18941893
{ok, H, Content, R1} = osiris_log:read_header0(R0),
18951894
?assertEqual(undefined, Content),
18961895
{H, W1, R1}
18971896
end,
1898-
fun(#{w := W0, r := R0}) ->
1897+
fun(#{w := W0, r := R0, fsize := FSize}) ->
18991898
%% previous chunk too large to read ahead
1900-
R1 = osiris_log:last_data_size(R0, RAL * 2),
1899+
R1 = osiris_log:read_ahead_hints(R0, RAL * 2, FSize),
19011900
{_, W1} = write_committed([<<"hiiiiiiiii">>, <<"hooooooo">>], W0),
19021901
{ok, H, Content, R2} = osiris_log:read_header0(R1),
19031902
?assertEqual(undefined, Content),
19041903
{H, W1, R2}
19051904
end,
1906-
fun(#{w := W0, r := R0}) ->
1905+
fun(#{w := W0, r := R0, fsize := FSize}) ->
19071906
%% trigger reading ahead by setting a small value for the
19081907
%% last chunk read.
19091908
%% this setting stays the same for the rest of the test
1910-
R1 = osiris_log:last_data_size(R0, 1),
1909+
R1 = osiris_log:read_ahead_hints(R0, 1, FSize),
19111910
Entries = [<<"foo">>, <<"bar">>],
19121911
{_, W1} = write_committed(Entries, W0),
19131912
{ok, H, Content, R2} = osiris_log:read_header0(R1),
@@ -1941,10 +1940,13 @@ read_header_ahead_offset_reader(Config) ->
19411940
{H, W1, R1}
19421941
end,
19431942
fun(#{w := W0, r := R0}) ->
1944-
Entries1 = [binary:copy(<<"a">>, 16)],
1943+
Entries1 = [binary:copy(<<"a">>, 2000)],
19451944
{_, W1} = write_committed(Entries1, W0),
1946-
Entries2 = [binary:copy(<<"b">>, 32)],
1945+
Entries2 = [binary:copy(<<"b">>, 2000)],
19471946
{_, W2} = write_committed(Entries2, W1),
1947+
%% this one is too big to be read ahead fully
1948+
Entries3 = [binary:copy(<<"c">>, 5000)],
1949+
{_, W3} = write_committed(Entries3, W2),
19481950

19491951
{ok, H1, Content1, R1} = osiris_log:read_header0(R0),
19501952
[_, _, D1, _] = fake_chunk(Entries1, ?LINE, 1, 100),
@@ -1953,7 +1955,11 @@ read_header_ahead_offset_reader(Config) ->
19531955
{ok, H2, Content2, R2} = osiris_log:read_header0(update_read(H1, R1)),
19541956
[_, _, D2, _] = fake_chunk(Entries2, ?LINE, 1, 100),
19551957
?assertEqual(iolist_to_binary(D2), Content2),
1956-
{H2, W2, R2}
1958+
1959+
{ok, H3, Content3, R3} = osiris_log:read_header0(update_read(H2, R2)),
1960+
?assertEqual(undefined, Content3),
1961+
1962+
{H3, W3, R3}
19571963
end
19581964
],
19591965

@@ -1972,7 +1978,7 @@ read_header_ahead_offset_reader(Config) ->
19721978
FSize, Wr0, Rd0),
19731979
osiris_log:close(Rd1),
19741980
osiris_log:close(Wr1)
1975-
end, [{FSize, RType} || FSize <- FilterSizes, RType <- [offset, data]]),
1981+
end, [{FSize, RType} || FSize <- FilterSizes, RType <- [data, offset]]),
19761982
ok.
19771983

19781984
read_header_ahead_offset_reader_filter(Config) ->
@@ -1992,30 +1998,85 @@ read_header_ahead_offset_reader_filter(Config) ->
19921998
Shared = osiris_log:get_shared(Wr0),
19931999
Conf = Conf1#{shared => Shared},
19942000
{ok, Rd0} = osiris_log:init_offset_reader(first, Conf),
1995-
Rd1 = osiris_log:last_data_size(Rd0, 1),
1996-
%% we always read ahead the default filter size.
1997-
%% with a larger-than-default filter, we must consider
1998-
%% the extra bytes that belong to the filter,
1999-
%% that is (actual filter size) - (default filter size)
2000-
%% this reduces the max entry size we can read ahead
2001-
MES = MaxEntrySize - (FSize - DFS),
2001+
%% we start by using the default filter size in the read ahead hints
2002+
Rd1 = osiris_log:read_ahead_hints(Rd0, 1, DFS),
2003+
%% compute the max entry size
2004+
%% (meaning we don't read ahead enough above this entry size)
2005+
%% first we don't know the actual filter size in the stream,
2006+
%% so we assume the default filter size
2007+
%% this "reduces" the max size of data we can read in the case
2008+
%% of a larger-than-default filter size, because of the extra
2009+
%% bytes that belong to the filter
2010+
MES1 = MaxEntrySize - (FSize - DFS),
2011+
%% then the max entry becomes accurate, whatever the actual filter size
2012+
MES2 = MaxEntrySize,
20022013

20032014
Tests =
20042015
[
20052016
fun(#{w := W0, r := R0}) ->
2006-
%% chunk with a non-empty filter
20072017
%% data do not fit in the read ahead
2008-
EData = binary:copy(<<"a">>, MES + 1),
2018+
EData = binary:copy(<<"a">>, MES1 + 1),
2019+
Entries = [{<<"banana">>, EData}],
2020+
{_, W1} = write_committed(Entries, W0),
2021+
{ok, H, Content, R1} = osiris_log:read_header0(R0),
2022+
?assertEqual(undefined, Content),
2023+
{H, W1, R1}
2024+
end,
2025+
fun(#{w := W0, r := R0}) ->
2026+
%% data exactly fits in the read ahead
2027+
EData = binary:copy(<<"a">>, MES1),
2028+
Entries = [{<<"banana">>, EData}],
2029+
{_, W1} = write_committed(Entries, W0),
2030+
{ok, H, Content, R1} = osiris_log:read_header0(R0),
2031+
[_, _, D, _] = fake_chunk(Entries, ?LINE, 1, 100),
2032+
?assertEqual(iolist_to_binary(D), Content),
2033+
{H, W1, R1}
2034+
end,
2035+
fun(#{w := W0, r := R}) ->
2036+
%% assume we are now using the correct filter size
2037+
%% (this setting stays the same for the next tests)
2038+
R0 = osiris_log:read_ahead_hints(R, 1, FSize),
2039+
%% data just bigger than the first limit
2040+
EData = binary:copy(<<"a">>, MES1 + 1),
2041+
Entries = [{<<"banana">>, EData}],
2042+
{_, W1} = write_committed(Entries, W0),
2043+
{ok, H, Content, R1} = osiris_log:read_header0(R0),
2044+
case FSize =:= DFS of
2045+
true ->
2046+
%% default filter size: still does not fit
2047+
?assertEqual(undefined, Content);
2048+
false ->
2049+
%% with the correct filter size, we now read
2050+
%% a bit further than with the first limit
2051+
[_, _, D, _] = fake_chunk(Entries, ?LINE, 1, 100),
2052+
?assertEqual(iolist_to_binary(D), Content)
2053+
end,
2054+
{H, W1, R1}
2055+
end,
2056+
fun(#{w := W0, r := R0}) ->
2057+
%% data exactly fits in the read ahead
2058+
EData = binary:copy(<<"a">>, MES1),
2059+
Entries = [{<<"banana">>, EData}],
2060+
{_, W1} = write_committed(Entries, W0),
2061+
{ok, H, Content, R1} = osiris_log:read_header0(R0),
2062+
[_, _, D, _] = fake_chunk(Entries, ?LINE, 1, 100),
2063+
?assertEqual(iolist_to_binary(D), Content),
2064+
{H, W1, R1}
2065+
end,
2066+
fun(#{w := W0, r := R0}) ->
2067+
%% we use the "new" max entry size
2068+
%% data do not fit in the read ahead
2069+
EData = binary:copy(<<"a">>, MES2 + 1),
20092070
Entries = [{<<"banana">>, EData}],
20102071
{_, W1} = write_committed(Entries, W0),
20112072
{ok, H, Content, R1} = osiris_log:read_header0(R0),
20122073
?assertEqual(undefined, Content),
20132074
{H, W1, R1}
20142075
end,
20152076
fun(#{w := W0, r := R0}) ->
2016-
%% chunk with a non-empty filter
2077+
%% we use the "new" max entry size
20172078
%% data exactly fits in the read ahead
2018-
EData = binary:copy(<<"a">>, MES),
2079+
EData = binary:copy(<<"a">>, MES2),
20192080
Entries = [{<<"banana">>, EData}],
20202081
{_, W1} = write_committed(Entries, W0),
20212082
{ok, H, Content, R1} = osiris_log:read_header0(R0),

0 commit comments

Comments
 (0)