Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 32 additions & 23 deletions src/osiris_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2723,31 +2723,40 @@ open(File, Options) ->
throw_missing(file:open(File, Options)).

chunk_location_for_timestamp(Idx, Ts) ->
%% TODO: optimise using skip search approach
Fd = open_index_read(Idx),
%% scan index file for nearest timestamp
{ChunkId, _Timestamp, _Epoch, FilePos} = timestamp_idx_scan(Fd, Ts),
{ok, IdxFd} = open(Idx, [read, raw, binary]),
_ = file:advise(IdxFd, 0, 0, random),
{Ts, {ChunkId, _Timestamp, _Epoch, FilePos}} =
idx_skip_search(IdxFd, ?IDX_HEADER_SIZE,
fun timestamp_search_fun/3,
{Ts, not_found}),
ok = file:close(IdxFd),
{ChunkId, FilePos}.

timestamp_idx_scan(Fd, Ts) ->
case file:read(Fd, ?INDEX_RECORD_SIZE_B) of
{ok,
<<ChunkId:64/unsigned,
Timestamp:64/signed,
Epoch:64/unsigned,
FilePos:32/unsigned,
_ChType:8/unsigned>>} ->
case Ts =< Timestamp of
true ->
ok = file:close(Fd),
{ChunkId, Timestamp, Epoch, FilePos};
false ->
timestamp_idx_scan(Fd, Ts)
end;
eof ->
ok = file:close(Fd),
eof
end.
%% NOTE: this is slightly different than offset_search_fun/3. When searching
%% for offsets, if the given offset is not a chunk ID then the search returns
%% the chunk ID less than the given offset. When searching for timestamps,
%% if the given timestamp is between the timestamp of two chunks, the later
%% chunk is returned.
timestamp_search_fun(scan, <<ChunkId:64/unsigned,
Timestamp:64/signed,
Epoch:64/unsigned,
FilePos:32/unsigned,
_ChType:8/unsigned>>, {Ts, _})
when Ts =< Timestamp ->
{return, {Ts, {ChunkId, Timestamp, Epoch, FilePos}}};
timestamp_search_fun(peek, <<_ChunkId:64/unsigned,
Timestamp:64/signed,
_Epoch:64/unsigned,
_FilePos:32/unsigned,
_ChType:8/unsigned>>, {Ts, _} = State)
when Ts =< Timestamp ->
{scan, State};
timestamp_search_fun(_Type, <<ChunkId:64/unsigned,
Timestamp:64/signed,
Epoch:64/unsigned,
FilePos:32/unsigned,
_ChType:8/unsigned>>, {Ts, _}) ->
{continue, {Ts, {ChunkId, Timestamp, Epoch, FilePos}}}.

validate_crc(ChunkId, Crc, IOData) ->
case erlang:crc32(IOData) of
Expand Down