Skip to content

Commit 5084a4f

Browse files
committed
Small fixes in read ahead
1 parent 4cbf62f commit 5084a4f

File tree

2 files changed

+250
-35
lines changed

2 files changed

+250
-35
lines changed

src/osiris_log.erl

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2991,8 +2991,8 @@ parse_header(<<?MAGIC:4/unsigned,
29912991
case osiris_bloom:is_match(ChunkFilter, Filter) of
29922992
true ->
29932993
<<HeaderData:?HEADER_SIZE_B/binary, _/binary>> = HeaderData0,
2994-
State = case Ra0 of
2995-
Ra1 ->
2994+
State = case Ra1 of
2995+
Ra0 ->
29962996
State0;
29972997
Ra ->
29982998
State0#?MODULE{mode = Read0#read{read_ahead = Ra}}
@@ -3309,21 +3309,22 @@ ra_read(ReadPos, Len, #ra{buf = {Pos, Data}})
33093309
ra_read(_Pos, _Len, _Ra) ->
33103310
undefined.
33113311

3312-
ra_update_size(undefined, _FilterSize, LastDataSize, #ra{size = Sz})
3312+
ra_update_size(undefined, _FilterSize, LastDataSize, #ra{size = Sz} = Ra)
33133313
when Sz < ?READ_AHEAD_LIMIT andalso
33143314
LastDataSize < ?READ_AHEAD_LIMIT ->
33153315
%% no filter and last data size was small so enable data read ahead
3316-
#ra{size = ?READ_AHEAD_LIMIT};
3317-
ra_update_size(Filter, FilterSize, _LastDataSize, #ra{size = Sz} = Ra)
3318-
when Filter =/= undefined ->
3316+
Ra#ra{size = ?READ_AHEAD_LIMIT};
3317+
ra_update_size(undefined, _FilterSize, LastDataSize,
3318+
#ra{size = ?READ_AHEAD_LIMIT} = Ra)
3319+
when LastDataSize < ?READ_AHEAD_LIMIT ->
3320+
Ra;
3321+
ra_update_size(_Filter, FilterSize, _LastDataSize, #ra{size = Sz} = Ra) ->
33193322
case FilterSize + ?HEADER_SIZE_B of
33203323
Sz ->
33213324
Ra;
33223325
NewSz ->
3323-
#ra{size = NewSz}
3324-
end;
3325-
ra_update_size(_Filter, _FilterSize, _LastDataSize, Ra) ->
3326-
Ra.
3326+
Ra#ra{size = NewSz}
3327+
end.
33273328

33283329
ra_can_fill(ReqSize, #ra{size = Sz}) ->
33293330
ReqSize =< Sz.
@@ -3339,6 +3340,26 @@ ra_fill(Fd, Pos, #ra{size = Sz} = Ra) ->
33393340
-ifdef(TEST).
33403341
-include_lib("eunit/include/eunit.hrl").
33413342

3343+
ra_update_size_test() ->
3344+
DefSize = ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE,
3345+
?assertMatch(#ra{size = DefSize}, #ra{}),
3346+
Ra0 = #ra{},
3347+
?assertMatch(#ra{size = ?READ_AHEAD_LIMIT},
3348+
ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, 100, Ra0)),
3349+
3350+
?assertMatch(#ra{size = ?READ_AHEAD_LIMIT},
3351+
ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, 100, Ra0)),
3352+
Ra1 = #ra{size = ?READ_AHEAD_LIMIT},
3353+
?assertMatch(#ra{size = DefSize},
3354+
ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, 5000, Ra1)),
3355+
3356+
?assertMatch(#ra{size = ?READ_AHEAD_LIMIT},
3357+
ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, 100, Ra1)),
3358+
3359+
?assertMatch(#ra{size = DefSize},
3360+
ra_update_size("a filter", ?DEFAULT_FILTER_SIZE, 100, Ra0)),
3361+
ok.
3362+
33423363
part_test() ->
33433364
[<<"ABCD">>] = part(4, [<<"ABCDEF">>]),
33443365
[<<"AB">>, <<"CD">>] = part(4, [<<"AB">>, <<"CDEF">>]),

test/osiris_log_SUITE.erl

Lines changed: 219 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ all_tests() ->
9494
init_with_unexpected_file,
9595
overview_with_missing_segment,
9696
overview_with_missing_index_at_start,
97-
read_header_ahead_offset_reader
98-
% read_header_ahead_offset_reader_filter
97+
read_header_ahead_offset_reader,
98+
read_header_ahead_offset_reader_filter
9999
].
100100

101101
groups() ->
@@ -401,34 +401,34 @@ iterator_read_chunk_with_read_ahead(Config) ->
401401
{_, W1} = write_committed(EntriesRev, W0),
402402
W1;
403403
(read, #{r := R0, tracer := T}) ->
404-
%% first chunk, there won't be any data size hints in the reader
404+
%% small chunk, managed to read it with the filter read-ahead
405405
{ok, _, I0, R1} = osiris_log:chunk_iterator(R0),
406406
{{_, <<"ho">>}, I1} = osiris_log:iterator_next(I0),
407407
{{_, <<"hi">>}, I2} = osiris_log:iterator_next(I1),
408408
?assertMatch(end_of_chunk, osiris_log:iterator_next(I2)),
409-
?assertEqual(2, osiris_tracer:call_count(T)),
409+
?assertEqual(1, osiris_tracer:call_count(T)),
410410
R1
411411
end,
412412
fun(write, #{w := W0}) ->
413-
%% this one will be read ahead
414413
EntriesRev = [<<"foo">>, <<"bar">>],
415414
{_, W1} = write_committed(EntriesRev, W0),
416415
W1;
417416
(read, #{r := R0, tracer := T}) ->
417+
%% not enough read-ahead data, we have to read from file
418418
{ok, _, I0, R1} = osiris_log:chunk_iterator(R0),
419419
{{_, <<"bar">>}, I1} = osiris_log:iterator_next(I0),
420420
{{_, <<"foo">>}, I2} = osiris_log:iterator_next(I1),
421421
?assertMatch(end_of_chunk, osiris_log:iterator_next(I2)),
422-
?assertEqual(0, osiris_tracer:call_count(T)),
422+
?assertEqual(1, osiris_tracer:call_count(T)),
423423
R1
424424
end,
425425
fun(write, #{w := W0}) ->
426-
%% this one will be read ahead
427426
E1 = binary:copy(<<"b">>, RAL - 200),
428427
EntriesRev = [E1 , <<"aaa">>],
429428
{_, W1} = write_committed(EntriesRev, W0),
430429
W1;
431430
(read, #{r := R0, tracer := T}) ->
431+
%% this one has been read ahead
432432
E1 = binary:copy(<<"b">>, RAL - 200),
433433
{ok, _, I0, R1} = osiris_log:chunk_iterator(R0),
434434
{{_, <<"aaa">>}, I1} = osiris_log:iterator_next(I0),
@@ -1990,20 +1990,21 @@ read_header_ahead_offset_reader(Config) ->
19901990
{_, W1} = write_committed([<<"hi">>, <<"ho">>], W0),
19911991
W1;
19921992
(read, #{r := R0, cs := CS, ss := SS, tracer := T}) ->
1993-
%% small chunk, we read it ahead already
1993+
%% small chunk, we'll read it ahead, no sendfile
19941994
[_, _, D, _] = ra_fake_chunk([<<"hi">>, <<"ho">>]),
19951995
{ok, R1} = osiris_log:send_file(CS, R0),
19961996
{ok, Read} = recv(SS, byte_size(D) + HS),
19971997
?assertEqual(D, binary:part(Read, HS, byte_size(Read) - HS)),
1998-
?assertEqual(0, osiris_tracer:call_count(T)),
1998+
?assertEqual(1, osiris_tracer:call_count(T)),
19991999
R1
20002000
end,
20012001
fun(write, #{w := W0}) ->
20022002
Entries = [<<"foo">>, binary:copy(<<"b">>, RAL * 2)],
20032003
{_, W1} = write_committed(Entries, W0),
20042004
W1;
20052005
(read, #{r := R0, cs := CS, ss := SS, tracer := T}) ->
2006-
%% large chunk, we will need to read from the file system
2006+
%% large chunk, we read ahead the header before,
2007+
%% but we'll use sendfile for the data
20072008
Entries = [<<"foo">>, binary:copy(<<"b">>, RAL * 2)],
20082009
[_, _, D, _] = ra_fake_chunk(Entries),
20092010
{ok, R1} = osiris_log:send_file(CS, R0),
@@ -2033,8 +2034,10 @@ read_header_ahead_offset_reader(Config) ->
20332034
end,
20342035
{ok, Read} = recv(SS, byte_size(Expected) + HS),
20352036
?assertEqual(Expected, binary:part(Read, HS, byte_size(Read) - HS)),
2036-
?assertEqual(1, osiris_tracer:call_count(T)),
2037-
?assertEqual(1, osiris_tracer:call_count(T, file, pread)),
2037+
ct:pal("call_count ~p", [osiris_tracer:calls(T)]),
2038+
%% no read ahead from before, and need a second read for the filter
2039+
?assertEqual(2, osiris_tracer:call_count(T)),
2040+
?assertEqual(2, osiris_tracer:call_count(T, file, pread)),
20382041
R1
20392042
end,
20402043
fun(write, #{w := W0}) ->
@@ -2087,6 +2090,7 @@ read_header_ahead_offset_reader(Config) ->
20872090
],
20882091

20892092
FilterSizes = [?DEFAULT_FILTER_SIZE, ?DEFAULT_FILTER_SIZE * 2],
2093+
RTypes = [data, offset],
20902094
Conf0 = ?config(osiris_conf, Config),
20912095
#{dir := Dir0} = Conf0,
20922096
lists:foreach(fun({FSize, RType}) ->
@@ -2120,7 +2124,194 @@ read_header_ahead_offset_reader(Config) ->
21202124

21212125
osiris_log:close(Rd1),
21222126
osiris_log:close(Wr1)
2123-
end, [{FSize, RType} || FSize <- FilterSizes, RType <- [data, offset]]),
2127+
end, [{FSize, RType} || FSize <- FilterSizes, RType <- RTypes]),
2128+
ok.
2129+
2130+
read_header_ahead_offset_reader_filter(Config) ->
2131+
RAL = 4096, %% read ahead limit
2132+
HS = ?HEADER_SIZE_B,
2133+
%% we store the entry size on 4 bytes, so we must substract them from the data size
2134+
MaxEntrySize = RAL - 4 - HS,
2135+
DFS = ?DEFAULT_FILTER_SIZE,
2136+
% FilterSizes = [DFS * 2],
2137+
FilterSizes = [DFS, DFS * 2],
2138+
Conf0 = ?config(osiris_conf, Config),
2139+
#{dir := Dir0} = Conf0,
2140+
2141+
lists:foreach(
2142+
fun(FSize) ->
2143+
Dir1 = filename:join(Dir0, integer_to_list(FSize)),
2144+
Conf1 = Conf0#{dir => Dir1,
2145+
filter_size => FSize},
2146+
Wr0 = osiris_log:init(Conf1),
2147+
Shared = osiris_log:get_shared(Wr0),
2148+
RConf = Conf1#{shared => Shared, transport => tcp},
2149+
{ok, Rd0} = osiris_log:init_offset_reader(first, RConf),
2150+
2151+
%% compute the max entry size
2152+
%% (meaning we don't read ahead enough above this entry size)
2153+
%% first we don't know the actual filter size in the stream,
2154+
%% so we assume the default filter size
2155+
%% this "reduces" the max size of data we can read in the case
2156+
%% of a larger-than-default filter size, because of the extra
2157+
%% bytes that belong to the filter
2158+
MES1 = MaxEntrySize - FSize,
2159+
%% then the max entry becomes accurate, whatever the actual filter size
2160+
MES2 = MaxEntrySize,
2161+
ct:pal("Using MES1 ~p, MES2 ~p for filter size ~p",
2162+
[MES1, MES2, FSize]),
2163+
2164+
Tests =
2165+
[
2166+
fun(write, #{w := W0}) ->
2167+
Entries = [{<<"banana">>, <<"aaa">>}],
2168+
{_, W1} = write_committed(Entries, W0),
2169+
W1;
2170+
(read, #{r := R0, cs := CS, ss := SS, tracer := T}) ->
2171+
Entries = [{<<"banana">>, <<"aaa">>}],
2172+
[_, _, D, _] = ra_fake_chunk(Entries),
2173+
{ok, R1} = osiris_log:send_file(CS, R0),
2174+
{ok, Read} = recv(SS, byte_size(D) + HS),
2175+
?assertEqual(D, binary:part(Read, HS, byte_size(Read) - HS)),
2176+
case FSize =:= DFS of
2177+
true ->
2178+
%% just read the header
2179+
?assertEqual(1, osiris_tracer:call_count(T, file, pread)),
2180+
%% no read ahead data yet, used sendfile
2181+
?assertEqual(1, osiris_tracer:call_count(T, file, sendfile));
2182+
false ->
2183+
%% read the header, but the filter data did not fit,
2184+
%% so read again
2185+
?assertEqual(2, osiris_tracer:call_count(T, file, pread)),
2186+
%% read the data in the second pread call
2187+
?assertEqual(0, osiris_tracer:call_count(T, file, sendfile))
2188+
end,
2189+
R1
2190+
end,
2191+
fun(write, #{w := W0}) ->
2192+
EData = binary:copy(<<"a">>, MES1),
2193+
Entries = [{<<"banana">>, EData}],
2194+
{_, W1} = write_committed(Entries, W0),
2195+
W1;
2196+
(read, #{r := R0, cs := CS, ss := SS, tracer := T}) ->
2197+
%% data just fit in the read ahead
2198+
EData = binary:copy(<<"a">>, MES1),
2199+
Entries = [{<<"banana">>, EData}],
2200+
[_, _, D, _] = ra_fake_chunk(Entries),
2201+
{ok, R1} = osiris_log:send_file(CS, R0),
2202+
{ok, Read} = recv(SS, byte_size(D) + HS),
2203+
?assertEqual(D, binary:part(Read, HS, byte_size(Read) - HS)),
2204+
case FSize =:= DFS of
2205+
true ->
2206+
%% just read the header
2207+
?assertEqual(1, osiris_tracer:call_count(T, file, pread)),
2208+
%% data just fit in the read ahead, no sendfile
2209+
?assertEqual(0, osiris_tracer:call_count(T, file, sendfile));
2210+
false ->
2211+
%% read the header ahead previously
2212+
%% (because it could not read the filter at first,
2213+
%% which triggered the read-ahead)
2214+
?assertEqual(0, osiris_tracer:call_count(T, file, pread)),
2215+
?assertEqual(1, osiris_tracer:call_count(T, file, sendfile))
2216+
end,
2217+
R1
2218+
end,
2219+
fun(write, #{w := W0}) ->
2220+
EData = binary:copy(<<"b">>, MES1 + 1),
2221+
Entries = [{<<"banana">>, EData}],
2222+
{_, W1} = write_committed(Entries, W0),
2223+
W1;
2224+
(read, #{r := R0, cs := CS, ss := SS, tracer := T}) ->
2225+
%% data do not fit in the read ahead
2226+
EData = binary:copy(<<"b">>, MES1 + 1),
2227+
Entries = [{<<"banana">>, EData}],
2228+
[_, _, D, _] = ra_fake_chunk(Entries),
2229+
{ok, R1} = osiris_log:send_file(CS, R0),
2230+
{ok, Read} = recv(SS, byte_size(D) + HS),
2231+
?assertEqual(D, binary:part(Read, HS, byte_size(Read) - HS)),
2232+
%% just read the header
2233+
?assertEqual(1, osiris_tracer:call_count(T, file, pread)),
2234+
%% large chunk, used sendfile
2235+
?assertEqual(1, osiris_tracer:call_count(T, file, sendfile)),
2236+
R1
2237+
end,
2238+
fun(write, #{w := W0}) ->
2239+
Entries = [{<<"banana">>, <<"aaa">>}],
2240+
{_, W1} = write_committed(Entries, W0),
2241+
W1;
2242+
(read, #{r := R0, cs := CS, ss := SS, tracer := T}) ->
2243+
%% we still read ahead, so we read a small chunk at once
2244+
Entries = [{<<"banana">>, <<"aaa">>}],
2245+
[_, _, D, _] = ra_fake_chunk(Entries),
2246+
{ok, R1} = osiris_log:send_file(CS, R0),
2247+
{ok, Read} = recv(SS, byte_size(D) + HS),
2248+
?assertEqual(D, binary:part(Read, HS, byte_size(Read) - HS)),
2249+
?assertEqual(1, osiris_tracer:call_count(T, file, pread)),
2250+
?assertEqual(0, osiris_tracer:call_count(T, file, sendfile)),
2251+
R1
2252+
end,
2253+
fun(write, #{w := W0}) ->
2254+
EData = binary:copy(<<"b">>, MES1 * 2),
2255+
Entries = [{<<"banana">>, EData}],
2256+
{_, W1} = write_committed(Entries, W0),
2257+
W1;
2258+
(read, #{r := R0, cs := CS, ss := SS, tracer := T}) ->
2259+
%% large chunk, we had the header already,
2260+
%% but we must use sendfile for the data
2261+
EData = binary:copy(<<"b">>, MES1 * 2),
2262+
Entries = [{<<"banana">>, EData}],
2263+
[_, _, D, _] = ra_fake_chunk(Entries),
2264+
{ok, R1} = osiris_log:send_file(CS, R0),
2265+
{ok, Read} = recv(SS, byte_size(D) + HS),
2266+
?assertEqual(D, binary:part(Read, HS, byte_size(Read) - HS)),
2267+
?assertEqual(0, osiris_tracer:call_count(T, file, pread)),
2268+
?assertEqual(1, osiris_tracer:call_count(T, file, sendfile)),
2269+
R1
2270+
end,
2271+
fun(write, #{w := W0}) ->
2272+
EData = binary:copy(<<"b">>, MES1 div 2),
2273+
Entries = [{<<"banana">>, EData}],
2274+
{_, W1} = write_committed(Entries, W0),
2275+
W1;
2276+
(read, #{r := R0, cs := CS, ss := SS, tracer := T}) ->
2277+
%% we don't read ahead anymore because the previous chunk was large
2278+
%% so we read the header and use sendfile,
2279+
%% even for a small chunk
2280+
EData = binary:copy(<<"b">>, MES1 div 2),
2281+
Entries = [{<<"banana">>, EData}],
2282+
[_, _, D, _] = ra_fake_chunk(Entries),
2283+
{ok, R1} = osiris_log:send_file(CS, R0),
2284+
{ok, Read} = recv(SS, byte_size(D) + HS),
2285+
?assertEqual(D, binary:part(Read, HS, byte_size(Read) - HS)),
2286+
?assertEqual(1, osiris_tracer:call_count(T, file, pread)),
2287+
?assertEqual(1, osiris_tracer:call_count(T, file, sendfile)),
2288+
R1
2289+
end
2290+
],
2291+
2292+
%% server, we will read stream data from this socket
2293+
{ok, SLS} = gen_tcp:listen(0, [binary, {packet, 0},
2294+
{active, false}]),
2295+
{ok, Port} = inet:port(SLS),
2296+
2297+
%% client, osiris will send to this socket
2298+
{ok, CS} = gen_tcp:connect("localhost", Port,
2299+
[binary, {packet, 0}]),
2300+
2301+
{ok, SS} = gen_tcp:accept(SLS),
2302+
2303+
#{w := Wr1, r := Rd1} = run_read_ahead_tests(Tests, offset, FSize,
2304+
Wr0, Rd0, #{ss => SS,
2305+
cs => CS}),
2306+
2307+
ok = gen_tcp:close(CS),
2308+
ok = gen_tcp:close(SS),
2309+
ok = gen_tcp:close(SLS),
2310+
2311+
osiris_log:close(Rd1),
2312+
osiris_log:close(Wr1)
2313+
end, FilterSizes),
2314+
21242315
ok.
21252316

21262317
% read_header_ahead_offset_reader_filter(Config) ->
@@ -2245,18 +2436,21 @@ init_reader(data, Conf) ->
22452436
run_read_ahead_tests(Tests, RType, FSize, Wr0, Rd0) ->
22462437
run_read_ahead_tests(Tests, RType, FSize, Wr0, Rd0, #{}).
22472438

2248-
run_read_ahead_tests(Tests, RType, FSize, Wr0, Rd0, Ctx) ->
2249-
W = lists:foldl(fun(F, Acc) ->
2250-
W = F(write, Acc),
2251-
Ctx#{w => W}
2252-
end, Ctx#{w => Wr0}, Tests),
2253-
R = lists:foldl(fun(F, Acc) ->
2254-
Tracer = osiris_tracer:start([{file, pread, '_'},
2255-
{file, sendfile, '_'}]),
2256-
R0 = F(read, Acc#{tracer => Tracer}),
2257-
osiris_tracer:stop(Tracer),
2258-
Ctx#{r => R0, rtype => RType, fsize => FSize}
2259-
end, Ctx#{r => Rd0, rtype => RType, fsize => FSize}, Tests),
2439+
run_read_ahead_tests(Tests, RType, FSize, Wr0, Rd0, Ctx0) ->
2440+
{_, W} = lists:foldl(fun(F, {N, Ctx}) ->
2441+
ct:pal("Read ahead test ~p, writing...", [N]),
2442+
W = F(write, Ctx),
2443+
{N + 1, Ctx#{w => W}}
2444+
end, {1, Ctx0#{w => Wr0}}, Tests),
2445+
{_, R} =
2446+
lists:foldl(fun(F, {N, Ctx}) ->
2447+
ct:pal("Read ahead test ~p, reading...", [N]),
2448+
Tracer = osiris_tracer:start([{file, pread, '_'},
2449+
{file, sendfile, '_'}]),
2450+
R0 = F(read, Ctx#{tracer => Tracer}),
2451+
osiris_tracer:stop(Tracer),
2452+
{N + 1, Ctx#{r => R0, rtype => RType, fsize => FSize}}
2453+
end, {1, Ctx0#{r => Rd0, rtype => RType, fsize => FSize}}, Tests),
22602454
maps:merge(W, R).
22612455

22622456
truncate_at(File, Pos) ->

0 commit comments

Comments
 (0)