Skip to content

Commit d68042d

Browse files
authored
Merge pull request #112 from rabbitmq/cache-reader-context
Cache reader context
2 parents 59f368e + 957125f commit d68042d

File tree

9 files changed

+229
-60
lines changed

9 files changed

+229
-60
lines changed

WORKSPACE.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ filegroup(
4040
visibility = ["//visibility:public"],
4141
)
4242
""",
43-
sha256 = "22d1f92c04cc41e19b2c332c958f2d5c364a1c7ae78549041187e9e0a0080bf3",
43+
# sha256 = "22d1f92c04cc41e19b2c332c958f2d5c364a1c7ae78549041187e9e0a0080bf3",
4444
strip_prefix = "tls-gen-main",
4545
urls = ["https://github.com/rabbitmq/tls-gen/archive/refs/heads/main.zip"],
4646
)

src/osiris.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ init_reader(Pid, OffsetSpec, CounterSpec) ->
196196
init_reader(Pid, OffsetSpec, {_, _} = CounterSpec, Options)
197197
when is_pid(Pid) andalso node(Pid) =:= node() ->
198198
?DEBUG("osiris: initialising reader. Spec: ~w", [OffsetSpec]),
199-
{ok, Ctx0} = gen:call(Pid, '$gen_call', get_reader_context),
199+
Ctx0 = osiris_util:get_reader_context(Pid),
200200
Ctx = Ctx0#{counter_spec => CounterSpec,
201201
options => Options},
202202
osiris_log:init_offset_reader(OffsetSpec, Ctx).
@@ -274,7 +274,7 @@ configure_logger(Module) ->
274274
first_chunk_id => integer()}.
275275
get_stats(Pid)
276276
when node(Pid) =:= node() ->
277-
{ok, #{shared := Shared}} = gen:call(Pid, '$gen_call', get_reader_context),
277+
#{shared := Shared} = osiris_util:get_reader_context(Pid),
278278
#{committed_chunk_id => osiris_log_shared:committed_chunk_id(Shared),
279279
first_chunk_id => osiris_log_shared:first_chunk_id(Shared),
280280
last_chunk_id => osiris_log_shared:last_chunk_id(Shared)};

src/osiris_ets.erl

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
6+
%%
7+
8+
-module(osiris_ets).
9+
10+
-behaviour(gen_server).
11+
12+
%% API functions
13+
-export([start_link/0]).
14+
%% gen_server callbacks
15+
-export([init/1,
16+
handle_call/3,
17+
handle_cast/2,
18+
handle_info/2,
19+
terminate/2,
20+
code_change/3]).
21+
22+
-record(state, {}).
23+
24+
%%%===================================================================
25+
%%% API functions
26+
%%%===================================================================
27+
28+
%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
29+
start_link() ->
30+
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
31+
32+
%%%===================================================================
33+
%%% gen_server callbacks
34+
%%%===================================================================
35+
36+
% @spec init(Args) -> {ok, State} |
37+
%% {ok, State, Timeout} |
38+
%% ignore |
39+
%% {stop, Reason}
40+
init(_) ->
41+
_ = ets:new(osiris_reader_context_cache, [set, named_table, public]),
42+
{ok, #state{}}.
43+
44+
%% @spec handle_call(Request, From, State) ->
45+
%% {reply, Reply, State} |
46+
%% {reply, Reply, State, Timeout} |
47+
%% {noreply, State} |
48+
%% {noreply, State, Timeout} |
49+
%% {stop, Reason, Reply, State} |
50+
%% {stop, Reason, State}
51+
handle_call(_Request, _From, State) ->
52+
Reply = ok,
53+
{reply, Reply, State}.
54+
55+
%% @spec handle_cast(Msg, State) -> {noreply, State} |
56+
%% {noreply, State, Timeout} |
57+
%% {stop, Reason, State}
58+
handle_cast(_Msg, State) ->
59+
{noreply, State}.
60+
61+
%% @spec handle_info(Info, State) -> {noreply, State} |
62+
%% {noreply, State, Timeout} |
63+
%% {stop, Reason, State}
64+
handle_info(_Info, State) ->
65+
{noreply, State}.
66+
67+
%% @spec terminate(Reason, State) -> void()
68+
terminate(_Reason, _State) ->
69+
ok.
70+
71+
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
72+
code_change(_OldVsn, State, _Extra) ->
73+
{ok, State}.
74+
75+
%%%===================================================================
76+
%%% Internal functions
77+
%%%===================================================================

src/osiris_log.erl

Lines changed: 42 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353

5454
% maximum size of a segment in bytes
5555
-define(DEFAULT_MAX_SEGMENT_SIZE_B, 500 * 1000 * 1000).
56+
% -define(MIN_SEGMENT_SIZE_B, 4_000_000).
5657
% maximum number of chunks per segment
5758
-define(DEFAULT_MAX_SEGMENT_SIZE_C, 256_000).
5859
-define(INDEX_RECORD_SIZE_B, 29).
@@ -442,8 +443,8 @@ init(#{dir := Dir,
442443
epoch := Epoch} = Config,
443444
WriterType) ->
444445
%% scan directory for segments if in write mode
445-
MaxSizeBytes =
446-
maps:get(max_segment_size_bytes, Config, ?DEFAULT_MAX_SEGMENT_SIZE_B),
446+
MaxSizeBytes = maps:get(max_segment_size_bytes, Config,
447+
?DEFAULT_MAX_SEGMENT_SIZE_B),
447448
MaxSizeChunks = application:get_env(osiris, max_segment_size_chunks,
448449
?DEFAULT_MAX_SEGMENT_SIZE_C),
449450
Retention = maps:get(retention, Config, []),
@@ -479,20 +480,21 @@ init(#{dir := Dir,
479480
counter_id = counter_id(Config),
480481
shared = Shared},
481482
ok = maybe_fix_corrupted_files(Config),
483+
DefaultNextOffset = case Config of
484+
#{initial_offset := IO}
485+
when WriterType == acceptor ->
486+
IO;
487+
_ ->
488+
0
489+
end,
482490
case first_and_last_seginfos(Config) of
483491
none ->
484-
NextOffset = case Config of
485-
#{initial_offset := IO}
486-
when WriterType == acceptor ->
487-
IO;
488-
_ ->
489-
0
490-
end,
491-
osiris_log_shared:set_first_chunk_id(Shared, NextOffset - 1),
492+
osiris_log_shared:set_first_chunk_id(Shared, DefaultNextOffset - 1),
492493
open_new_segment(#?MODULE{cfg = Cfg,
493494
mode =
494495
#write{type = WriterType,
495-
tail_info = {NextOffset, empty},
496+
tail_info = {DefaultNextOffset,
497+
empty},
496498
current_epoch = Epoch}});
497499
{NumSegments,
498500
#seg_info{first = #chunk_info{id = FstChId,
@@ -555,11 +557,11 @@ init(#{dir := Dir,
555557
%% here too?
556558
{ok, _} = file:position(SegFd, eof),
557559
{ok, _} = file:position(IdxFd, eof),
558-
osiris_log_shared:set_first_chunk_id(Shared, -1),
560+
osiris_log_shared:set_first_chunk_id(Shared, DefaultNextOffset - 1),
559561
#?MODULE{cfg = Cfg,
560562
mode =
561563
#write{type = WriterType,
562-
tail_info = {0, empty},
564+
tail_info = {DefaultNextOffset, empty},
563565
current_epoch = Epoch},
564566
current_file = filename:basename(Filename),
565567
fd = SegFd,
@@ -847,16 +849,19 @@ truncate_to(_Name, _Range, [], IdxFiles) ->
847849
[];
848850
truncate_to(Name, RemoteRange, [{E, ChId} | NextEOs], IdxFiles) ->
849851
case find_segment_for_offset(ChId, IdxFiles) of
850-
not_found ->
852+
Result when Result == not_found orelse
853+
element(1, Result) == end_of_log ->
854+
%% both not_found and end_of_log needs to be treated as not found
855+
%% as they are...
851856
case build_seg_info(lists:last(IdxFiles)) of
852857
{ok, #seg_info{last = #chunk_info{epoch = E,
853858
id = LastChId,
854859
num = Num}}}
855-
when ChId > LastChId + Num ->
860+
when ChId > LastChId ->
856861
%% the last available local chunk id is smaller than the
857-
%% sources last chunk id but is in the same epoch
862+
%% source's last chunk id but is in the same epoch
858863
%% check if there is any overlap
859-
LastOffsLocal = LastChId + Num,
864+
LastOffsLocal = LastChId + Num - 1,
860865
FstOffsetRemote = case RemoteRange of
861866
empty -> 0;
862867
{F, _} -> F
@@ -878,8 +883,6 @@ truncate_to(Name, RemoteRange, [{E, ChId} | NextEOs], IdxFiles) ->
878883
%% TODO: what to do if error is returned from
879884
%% build_seg_info/1?
880885
end;
881-
{end_of_log, _Info} ->
882-
IdxFiles;
883886
{found, #seg_info{file = File, index = IdxFile}} ->
884887
?DEBUG("osiris_log: ~s on node ~s truncating to chunk "
885888
"id ~b in epoch ~b",
@@ -1502,18 +1505,21 @@ send_file(Sock,
15021505
true ->
15031506
%% this avoids any data sent in the Callback to be dispatched
15041507
%% in it's own TCP frame
1505-
ok = setopts(Transport, Sock, [{nopush, true}]),
1506-
_ = Callback(Header, ToSend),
1507-
case sendfile(Transport, Fd, Sock, Pos, ToSend) of
1508+
case setopts(Transport, Sock, [{nopush, true}]) of
15081509
ok ->
1509-
ok = setopts(Transport, Sock, [{nopush, false}]),
1510-
{ok, _} = file:position(Fd, NextFilePos),
1511-
{ok, State};
1512-
Err ->
1513-
%% reset the position to the start of the current
1514-
%% chunk so that subsequent reads won't error
1515-
{ok, _} = file:position(Fd, Pos),
1516-
Err
1510+
_ = Callback(Header, ToSend),
1511+
case sendfile(Transport, Fd, Sock, Pos, ToSend) of
1512+
ok ->
1513+
ok = setopts(Transport, Sock, [{nopush, false}]),
1514+
{ok, _} = file:position(Fd, NextFilePos),
1515+
{ok, State};
1516+
Err ->
1517+
%% reset the position to the start of the current
1518+
%% chunk so that subsequent reads won't error
1519+
{ok, _} = file:position(Fd, Pos),
1520+
Err
1521+
end;
1522+
Err -> Err
15171523
end;
15181524
false ->
15191525
{ok, _} = file:position(Fd, NextFilePos),
@@ -1997,10 +2003,10 @@ last_epoch_offsets([IdxFile]) ->
19972003
last_epoch_offsets([FstIdxFile | _] = IdxFiles) ->
19982004
F = fun() ->
19992005
{ok, FstFd} = open(FstIdxFile, [read, raw, binary]),
2000-
%% on linux this disables read-ahead so should only
2001-
%% bring a single block into memory
2002-
%% having the first block of index files in page cache
2003-
%% should generally be a good thing
2006+
%% on linux this disables read-ahead so should only
2007+
%% bring a single block into memory
2008+
%% having the first block of index files in page cache
2009+
%% should generally be a good thing
20042010
_ = file:advise(FstFd, 0, 0, random),
20052011
{ok, <<FstO:64/unsigned,
20062012
_FstTimestamp:64/signed,
@@ -2186,9 +2192,9 @@ max_segment_size_reached(
21862192
CurrentSizeChunks >= MaxSizeChunks.
21872193

21882194
setopts(tcp, Sock, Opts) ->
2189-
ok = inet:setopts(Sock, Opts);
2195+
inet:setopts(Sock, Opts);
21902196
setopts(ssl, Sock, Opts) ->
2191-
ok = ssl:setopts(Sock, Opts).
2197+
ssl:setopts(Sock, Opts).
21922198

21932199
sendfile(_Transport, _Fd, _Sock, _Pos, 0) ->
21942200
ok;

src/osiris_replica.erl

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ handle_continue(#{name := Name0,
197197
%% re-discover the committed offset
198198
osiris_writer:ack(LeaderPid, {LastChId, LastTs})
199199
end,
200-
?INFO_(Name, "osiris replicas starting in epoch ~b, next offset ~b, tail info ~w",
200+
?INFO_(Name, "osiris replica starting in epoch ~b, next offset ~b, tail info ~w",
201201
[Epoch, NextOffset, TailInfo]),
202202

203203
%% HostName: append the HostName to the Ip(s) list: in some cases
@@ -246,8 +246,8 @@ handle_continue(#{name := Name0,
246246
RRPid = osiris_replica_reader:start(Node, ReplicaReaderConf),
247247
true = link(RRPid),
248248
GcInterval0 = application:get_env(osiris,
249-
replica_forced_gc_default_interval,
250-
4999),
249+
replica_forced_gc_default_interval,
250+
4999),
251251

252252
GcInterval1 = case is_integer(GcInterval0) of
253253
true ->
@@ -258,6 +258,11 @@ handle_continue(#{name := Name0,
258258
end,
259259
counters:put(CntRef, ?C_COMMITTED_OFFSET, -1),
260260
counters:put(CntRef, ?C_EPOCH, Epoch),
261+
Shared = osiris_log:get_shared(Log),
262+
osiris_util:cache_reader_context(self(), Dir, Name, Shared, ExtRef,
263+
fun(Inc) ->
264+
counters:add(CntRef, ?C_READERS, Inc)
265+
end),
261266
EvtFmt = maps:get(event_formatter, Config, undefined),
262267
{noreply,
263268
#?MODULE{cfg =
@@ -570,6 +575,7 @@ terminate(_Reason, undefined) ->
570575
terminate(Reason, #?MODULE{cfg = #cfg{name = Name,
571576
socket = Sock}, log = Log}) ->
572577
?DEBUG_(Name, "terminating with ~w ", [Reason]),
578+
_ = ets:delete(osiris_reader_context_cache, self()),
573579
ok = osiris_log:close(Log),
574580
ok = gen_tcp:close(Sock),
575581
ok.

src/osiris_sup.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ init([]) ->
2222
intensity => 5,
2323
period => 5},
2424
%% todo put under own sup
25+
Ets =
26+
#{id => osiris_ets,
27+
type => worker,
28+
start => {osiris_ets, start_link, []}},
2529
Retention =
2630
#{id => osiris_retention,
2731
type => worker,
@@ -34,4 +38,4 @@ init([]) ->
3438
#{id => osiris_replica_reader_sup,
3539
type => supervisor,
3640
start => {osiris_replica_reader_sup, start_link, []}},
37-
{ok, {SupFlags, [Retention, ServerSup, ReplicaReader]}}.
41+
{ok, {SupFlags, [Ets, Retention, ServerSup, ReplicaReader]}}.

src/osiris_util.erl

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
get_replication_configuration_from_tls_dist/1,
1919
get_replication_configuration_from_tls_dist/2,
2020
partition_parallel/3,
21-
normalise_name/1
21+
normalise_name/1,
22+
get_reader_context/1,
23+
cache_reader_context/6
2224
]).
2325

2426
%% For testing
@@ -267,3 +269,27 @@ normalise_name(Name) when is_binary(Name) ->
267269
Name;
268270
normalise_name(Name) when is_list(Name) ->
269271
list_to_binary(Name).
272+
273+
get_reader_context(Pid)
274+
when is_pid(Pid) andalso node(Pid) == node() ->
275+
case ets:lookup(osiris_reader_context_cache, Pid) of
276+
[] ->
277+
{ok, Ctx0} = gen:call(Pid, '$gen_call', get_reader_context),
278+
Ctx0;
279+
[{_Pid, Dir, Name, Shared, Ref, ReadersCountersFun}] ->
280+
#{dir => Dir,
281+
name => Name,
282+
shared => Shared,
283+
reference => Ref,
284+
readers_counter_fun => ReadersCountersFun}
285+
end.
286+
287+
cache_reader_context(Pid, Dir, Name, Shared, Ref, ReadersCounterFun)
288+
when is_pid(Pid) andalso
289+
?IS_STRING(Dir) andalso
290+
is_function(ReadersCounterFun) ->
291+
true = ets:insert(osiris_reader_context_cache,
292+
{Pid, Dir, Name, Shared, Ref, ReadersCounterFun}),
293+
ok.
294+
295+

0 commit comments

Comments
 (0)