Skip to content

Commit 3827fac

Browse files
committed
Use read ahead data in chunk iterator if available
1 parent 114d8f5 commit 3827fac

File tree

2 files changed

+99
-19
lines changed

2 files changed

+99
-19
lines changed

src/osiris_log.erl

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
%% License, v. 2.0. If a copy of the MPL was not distributed with this
33
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
44
%%
5-
%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries.
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries.
66
%%
77

88
-module(osiris_log).
@@ -1443,8 +1443,9 @@ chunk_iterator(State) ->
14431443
{error, {invalid_chunk_header, term()}}.
14441444
chunk_iterator(#?MODULE{cfg = #cfg{},
14451445
mode = #read{type = RType,
1446-
chunk_selector = Selector}
1447-
} = State0, CreditHint)
1446+
chunk_selector = Selector,
1447+
filter_size = RaFs}} = State0,
1448+
CreditHint)
14481449
when (is_integer(CreditHint) andalso CreditHint > 0) orelse
14491450
is_atom(CreditHint) ->
14501451
%% reads the next chunk of unparsed chunk data
@@ -1458,15 +1459,20 @@ chunk_iterator(#?MODULE{cfg = #cfg{},
14581459
data_size := DataSize,
14591460
filter_size := FilterSize,
14601461
position := Pos,
1461-
next_position := NextPos} = Header, _,
1462-
#?MODULE{fd = Fd, mode = #read{next_offset = ChId} = Read} = State1} ->
1463-
State = State1#?MODULE{mode = Read#read{next_offset = ChId + NumRecords,
1464-
position = NextPos}},
1462+
next_position := NextPos} = Header, MaybeData,
1463+
#?MODULE{fd = Fd, mode = #read{next_offset = ChId} = Read1} = State1} ->
14651464
case needs_handling(RType, Selector, ChType) of
14661465
true ->
1466+
Read = Read1#read{next_offset = ChId + NumRecords,
1467+
position = NextPos,
1468+
last_data_size = DataSize,
1469+
filter_size = read_ahead_fsize(RaFs,
1470+
FilterSize)},
1471+
1472+
State = State1#?MODULE{mode = Read},
14671473
DataPos = Pos + ?HEADER_SIZE_B + FilterSize,
14681474
Data = iter_read_ahead(Fd, DataPos, ChId, Crc, CreditHint,
1469-
DataSize, NumEntries),
1475+
DataSize, NumEntries, MaybeData),
14701476
Iterator = #iterator{fd = Fd,
14711477
data = Data,
14721478
next_offset = ChId,
@@ -1475,6 +1481,9 @@ chunk_iterator(#?MODULE{cfg = #cfg{},
14751481
{ok, Header, Iterator, State};
14761482
false ->
14771483
%% skip
1484+
Read = Read1#read{next_offset = ChId + NumRecords,
1485+
position = NextPos},
1486+
State = State1#?MODULE{mode = Read},
14781487
chunk_iterator(State, CreditHint)
14791488
end;
14801489
Other ->
@@ -1681,13 +1690,13 @@ send_file(Sock,
16811690
header_data := HeaderData} = Header,
16821691
MaybeData,
16831692
#?MODULE{fd = Fd,
1684-
mode = #read{next_offset = ChId} = Read0} = State1} ->
1693+
mode = #read{next_offset = ChId} = Read1} = State1} ->
16851694

16861695
%% only sendfile if either the reader is a data reader
16871696
%% or the chunk is a user type (for offset readers)
16881697
case needs_handling(RType, Selector, ChType) of
16891698
true ->
1690-
Read = Read0#read{next_offset = ChId + NumRecords,
1699+
Read = Read1#read{next_offset = ChId + NumRecords,
16911700
position = NextPos,
16921701
last_data_size = DataSize,
16931702
filter_size = read_ahead_fsize(RaFs,
@@ -1733,7 +1742,7 @@ send_file(Sock,
17331742
end
17341743
end;
17351744
false ->
1736-
Read = Read0#read{next_offset = ChId + NumRecords,
1745+
Read = Read1#read{next_offset = ChId + NumRecords,
17371746
position = NextPos},
17381747
State = State1#?MODULE{mode = Read},
17391748
%% skip chunk and recurse
@@ -3362,16 +3371,21 @@ dump_crc_check(Fd) ->
33623371
dump_crc_check(Fd)
33633372
end.
33643373

3365-
iter_read_ahead(_Fd, _Pos, _ChunkId, _Crc, 1, _DataSize, _NumEntries) ->
3374+
iter_read_ahead(_Fd, _Pos, _ChunkId, _Crc, _Credit, DataSize, _NumEntries, RAD)
3375+
when RAD =/= undefined ->
3376+
<<Data:DataSize/binary, _/binary>> = RAD,
3377+
Data;
3378+
iter_read_ahead(_Fd, _Pos, _ChunkId, _Crc, 1, _DataSize, _NumEntries, undefined) ->
3379+
%% FIXME is it about credit = 1 or number of entries = 1?
33663380
%% no point reading ahead if there is only one entry to be read at this
33673381
%% time
33683382
undefined;
3369-
iter_read_ahead(Fd, Pos, ChunkId, Crc, Credit, DataSize, NumEntries)
3383+
iter_read_ahead(Fd, Pos, ChunkId, Crc, Credit, DataSize, NumEntries, undefined)
33703384
when Credit == all orelse NumEntries == 1 ->
33713385
{ok, Data} = file:pread(Fd, Pos, DataSize),
33723386
validate_crc(ChunkId, Crc, Data),
33733387
Data;
3374-
iter_read_ahead(Fd, Pos, _ChunkId, _Crc, Credit0, DataSize, NumEntries) ->
3388+
iter_read_ahead(Fd, Pos, _ChunkId, _Crc, Credit0, DataSize, NumEntries, undefined) ->
33753389
%% read ahead, assumes roughly equal entry sizes which may not be the case
33763390
%% TODO round up to nearest block?
33773391
%% We can only practically validate CRC if we read the whole data

test/osiris_log_SUITE.erl

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ all_tests() ->
3838
subbatch,
3939
subbatch_compressed,
4040
iterator_read_chunk,
41+
iterator_read_chunk_with_read_ahead,
4142
iterator_read_chunk_mixed_sizes_with_credit,
4243
read_chunk_parsed,
4344
read_chunk_parsed_2,
@@ -344,14 +345,16 @@ iterator_read_chunk(Config) ->
344345
EntriesRev = [Batch,
345346
<<"ho">>,
346347
{<<"filter">>, <<"hi">>}],
347-
{ChId, _S1} = write_committed(EntriesRev, S0),
348-
{ok, _H, I0, _R2} = osiris_log:chunk_iterator(R1),
348+
{ChId, S1} = write_committed(EntriesRev, S0),
349+
{ok, _H, I0, R2} = osiris_log:chunk_iterator(R1),
349350
HoOffs = ChId + 1,
350351
BatchOffs = ChId + 2,
351352
{{ChId, <<"hi">>}, I1} = osiris_log:iterator_next(I0),
352353
{{HoOffs, <<"ho">>}, I2} = osiris_log:iterator_next(I1),
353354
{{BatchOffs, Batch}, I} = osiris_log:iterator_next(I2),
354355
?assertMatch(end_of_chunk, osiris_log:iterator_next(I)),
356+
osiris_log:close(R2),
357+
osiris_log:close(S1),
355358
ok.
356359

357360
iterator_read_chunk_mixed_sizes_with_credit(Config) ->
@@ -365,16 +368,79 @@ iterator_read_chunk_mixed_sizes_with_credit(Config) ->
365368
EntriesRev = [Big,
366369
<<"ho">>,
367370
{<<"filter">>, <<"hi">>}],
368-
{ChId, _S1} = write_committed(EntriesRev, S0),
371+
{ChId, S1} = write_committed(EntriesRev, S0),
369372
%% this is a less than ideal case where we have one large and two very
370-
%% small entries inthe same batch. The read ahead only
371-
{ok, _H, I0, _R2} = osiris_log:chunk_iterator(R1, 2),
373+
%% small entries in the same batch. We read ahead only the 2 first entries.
374+
{ok, _H, I0, R2} = osiris_log:chunk_iterator(R1, 2),
372375
HoOffs = ChId + 1,
373376
BigOffs = ChId + 2,
374377
{{ChId, <<"hi">>}, I1} = osiris_log:iterator_next(I0),
375378
{{HoOffs, <<"ho">>}, I2} = osiris_log:iterator_next(I1),
376379
{{BigOffs, Big}, I} = osiris_log:iterator_next(I2),
377380
?assertMatch(end_of_chunk, osiris_log:iterator_next(I)),
381+
osiris_log:close(R2),
382+
osiris_log:close(S1),
383+
ok.
384+
385+
iterator_read_chunk_with_read_ahead(Config) ->
386+
%% the test makes sure reading ahead on header reading does not break
387+
%% the iterator
388+
RAL = 4096, %% read ahead limit
389+
Conf = ?config(osiris_conf, Config),
390+
W = osiris_log:init(Conf),
391+
Shared = osiris_log:get_shared(W),
392+
RConf = Conf#{shared => Shared},
393+
{ok, R} = osiris_log:init_offset_reader(0, RConf),
394+
Tests =
395+
[
396+
fun(#{w := W0, r := R0}) ->
397+
%% first chunk, there won't be any data size hints in the reader
398+
EntriesRev = [<<"hi">>, <<"ho">>],
399+
{_, W1} = write_committed(EntriesRev, W0),
400+
{ok, H, I0, R1} = osiris_log:chunk_iterator(R0),
401+
{{_, <<"ho">>}, I1} = osiris_log:iterator_next(I0),
402+
{{_, <<"hi">>}, I2} = osiris_log:iterator_next(I1),
403+
?assertMatch(end_of_chunk, osiris_log:iterator_next(I2)),
404+
{H, W1, R1}
405+
end,
406+
fun(#{w := W0, r := R0}) ->
407+
%% this one will be read ahead
408+
EntriesRev = [<<"foo">>, <<"bar">>],
409+
{_, W1} = write_committed(EntriesRev, W0),
410+
{ok, H, I0, R1} = osiris_log:chunk_iterator(R0),
411+
{{_, <<"bar">>}, I1} = osiris_log:iterator_next(I0),
412+
{{_, <<"foo">>}, I2} = osiris_log:iterator_next(I1),
413+
?assertMatch(end_of_chunk, osiris_log:iterator_next(I2)),
414+
{H, W1, R1}
415+
end,
416+
fun(#{w := W0, r := R0}) ->
417+
%% this one will be read ahead
418+
E1 = rand:bytes(RAL - 100),
419+
EntriesRev = [E1 , <<"aaa">>],
420+
{_, W1} = write_committed(EntriesRev, W0),
421+
{ok, H, I0, R1} = osiris_log:chunk_iterator(R0),
422+
{{_, <<"aaa">>}, I1} = osiris_log:iterator_next(I0),
423+
{{_, E1}, I2} = osiris_log:iterator_next(I1),
424+
?assertMatch(end_of_chunk, osiris_log:iterator_next(I2)),
425+
{H, W1, R1}
426+
end,
427+
fun(#{w := W0, r := R0}) ->
428+
%% this one is too big to be read ahead
429+
E1 = rand:bytes(RAL * 2),
430+
EntriesRev = [E1 , <<"aaa">>],
431+
{_, W1} = write_committed(EntriesRev, W0),
432+
{ok, H, I0, R1} = osiris_log:chunk_iterator(R0),
433+
{{_, <<"aaa">>}, I1} = osiris_log:iterator_next(I0),
434+
{{_, E1}, I2} = osiris_log:iterator_next(I1),
435+
?assertMatch(end_of_chunk, osiris_log:iterator_next(I2)),
436+
{H, W1, R1}
437+
end
438+
],
439+
440+
#{w := Wr1, r := Rd1} = run_read_ahead_tests(Tests, offset,
441+
?DEFAULT_FILTER_SIZE, W, R),
442+
osiris_log:close(Rd1),
443+
osiris_log:close(Wr1),
378444
ok.
379445

380446
read_chunk_parsed(Config) ->

0 commit comments

Comments
 (0)