Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/osiris.erl
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
{error, term()} |
{error, term(), config()}.
start_cluster(Config00 = #{name := Name}) ->
?DEBUG("osiris: starting new cluster ~s", [Name]),
?DEBUG("osiris: starting new cluster ~ts", [Name]),
true = osiris_util:validate_base64uri(Name),
%% ensure reference is set
Config0 = maps:merge(#{reference => Name}, Config00),
Expand Down Expand Up @@ -254,7 +254,7 @@ start_replicas(Config, [Node | Nodes], ReplicaPids) ->
start_replicas(Config, Nodes, [Pid | ReplicaPids]);
{error, Reason} ->
Name = maps:get(name, Config, undefined),
error_logger:info_msg("osiris:start_replicas for ~s failed to start replica "
error_logger:info_msg("osiris:start_replicas for ~ts failed to start replica "
"on ~w, reason: ~w",
[Name, Node, Reason]),
%% coordinator might try to start this replica in the future
Expand Down
27 changes: 13 additions & 14 deletions src/osiris_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,9 @@ init(#{dir := Dir,
MaxSizeChunks = application:get_env(osiris, max_segment_size_chunks,
?DEFAULT_MAX_SEGMENT_SIZE_C),
Retention = maps:get(retention, Config, []),
?INFO("Stream: ~s will use ~s for osiris log data directory",
?INFO("Stream: ~ts will use ~ts for osiris log data directory",
[Name, Dir]),
?DEBUG("osiris_log:init/1 stream ~s max_segment_size_bytes: ~b,
?DEBUG("osiris_log:init/1 stream ~ts max_segment_size_bytes: ~b,
max_segment_size_chunks ~b, retention ~w",
[Name, MaxSizeBytes, MaxSizeChunks, Retention]),
ok = filelib:ensure_dir(Dir),
Expand Down Expand Up @@ -523,7 +523,7 @@ init(#{dir := Dir,
counters:put(Cnt, ?C_SEGMENTS, NumSegments),
osiris_log_shared:set_first_chunk_id(Shared, FstChId),
osiris_log_shared:set_last_chunk_id(Shared, LastChId),
?DEBUG("~s:~s/~b: ~s next offset ~b first offset ~b",
?DEBUG("~s:~s/~b: ~ts next offset ~b first offset ~b",
[?MODULE,
?FUNCTION_NAME,
?FUNCTION_ARITY,
Expand Down Expand Up @@ -799,7 +799,7 @@ init_acceptor(Range, EpochOffsets0,

%% then truncate to
IdxFiles = sorted_index_files(Dir),
?DEBUG("~s: ~s ~s from epoch offsets: ~w range ~w",
?DEBUG("~s: ~s ~ts from epoch offsets: ~w range ~w",
[?MODULE, ?FUNCTION_NAME, Name, EpochOffsets, Range]),
RemIdxFiles = truncate_to(Name, Range, EpochOffsets, IdxFiles),
%% after truncation we can do normal init
Expand Down Expand Up @@ -835,7 +835,7 @@ chunk_id_index_scan0(Fd, ChunkId) ->

delete_segment_from_index(Index) ->
File = segment_from_index_file(Index),
?DEBUG("osiris_log: deleting segment ~s", [File]),
?DEBUG("osiris_log: deleting segment ~ts", [File]),
ok = prim_file:delete(Index),
ok = prim_file:delete(File),
ok.
Expand Down Expand Up @@ -884,7 +884,7 @@ truncate_to(Name, RemoteRange, [{E, ChId} | NextEOs], IdxFiles) ->
%% build_seg_info/1?
end;
{found, #seg_info{file = File, index = IdxFile}} ->
?DEBUG("osiris_log: ~s on node ~s truncating to chunk "
?DEBUG("osiris_log: ~ts on node ~ts truncating to chunk "
"id ~b in epoch ~b",
[Name, node(), ChId, E]),
%% this is the inclusive case
Expand Down Expand Up @@ -937,7 +937,7 @@ init_data_reader({StartChunkId, PrevEOT}, #{dir := Dir,
name := Name} = Config) ->
IdxFiles = sorted_index_files(Dir),
Range = offset_range_from_idx_files(IdxFiles),
?DEBUG("osiris_segment:init_data_reader/2 ~s at ~b prev "
?DEBUG("osiris_segment:init_data_reader/2 ~ts at ~b prev "
"~w local range: ~w",
[Name, StartChunkId, PrevEOT, Range]),
%% Invariant: there is always at least one segment left on disk
Expand Down Expand Up @@ -1258,7 +1258,7 @@ last_user_chunk_id0([IdxFile | Rest]) ->
{ok, Id, Pos} ->
{Id, Pos, IdxFile};
{error, Reason} ->
?DEBUG("Could not find user chunk in index file ~s (~p)", [IdxFile, Reason]),
?DEBUG("Could not find user chunk in index file ~ts (~p)", [IdxFile, Reason]),
last_user_chunk_id0(Rest)
end.

Expand Down Expand Up @@ -1564,7 +1564,7 @@ delete_directory(#{name := Name} = Config) when is_map(Config) ->
delete_directory(Name);
delete_directory(Name) when ?IS_STRING(Name) ->
Dir = directory(Name),
?DEBUG("osiris_log: deleting directory ~s", [Dir]),
?DEBUG("osiris_log: deleting directory ~ts", [Dir]),
case file:list_dir(Dir) of
{ok, Files} ->
[ok =
Expand Down Expand Up @@ -1688,7 +1688,7 @@ first_and_last_seginfos0([FstIdxFile | Rem] = IdxFiles) ->
{ok, LastSegInfo} ->
{length(Rem) + 1, FstSegInfo, LastSegInfo};
{error, Err} ->
?ERROR("~s: failed to build seg_info from file ~s, error: ~w",
?ERROR("~s: failed to build seg_info from file ~ts, error: ~w",
[?MODULE, LastIdxFile, Err]),
error(Err)
end;
Expand Down Expand Up @@ -1804,7 +1804,7 @@ build_segment_info(SegFile, LastChunkPos, IdxFile) ->
_Reserved:32>>} ->
Size = LastChunkPos + LastSize + LastTSize + ?HEADER_SIZE_B,
{ok, Eof} = file:position(Fd, eof),
?DEBUG_IF("~s: segment ~s has trailing data ~w ~w",
?DEBUG_IF("~s: segment ~ts has trailing data ~w ~w",
[?MODULE, filename:basename(SegFile),
Size, Eof], Size =/= Eof),
_ = file:close(Fd),
Expand Down Expand Up @@ -1903,7 +1903,7 @@ update_retention(Retention,
evaluate_retention(Dir, Specs) when is_list(Dir) ->
% convert to binary for faster operations later
% mostly in segment_from_index_file/1
evaluate_retention(list_to_binary(Dir), Specs);
evaluate_retention(unicode:characters_to_binary(Dir), Specs);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You always convert to unicode = utf8 even if the VM is running in latin1 mode. I think it's probably fine here. Just want to add a note that
https://github.com/erlang/otp/blob/a1f235fc05653fa524fc7c4a352398af6ce0b364/lib/stdlib/src/filename.erl#L845-L851 uses file:native_name_encoding() as OutEncoding.

evaluate_retention(Dir, Specs) when is_binary(Dir) ->

{Time, Result} = timer:tc(
Expand Down Expand Up @@ -2354,7 +2354,7 @@ open_new_segment(#?MODULE{cfg = #cfg{name = Name,
_ = close_fd(OldIdxFd),
Filename = make_file_name(NextOffset, "segment"),
IdxFilename = make_file_name(NextOffset, "index"),
?DEBUG("~s: ~s ~s: ~s", [?MODULE, ?FUNCTION_NAME, Name, Filename]),
?DEBUG("~s: ~s ~ts: ~ts", [?MODULE, ?FUNCTION_NAME, Name, Filename]),
{ok, IdxFd} =
file:open(
filename:join(Dir, IdxFilename), ?FILE_OPTS_WRITE),
Expand Down Expand Up @@ -2784,7 +2784,6 @@ close_fd(Fd) ->

-ifdef(TEST).

% -include_lib("eunit/include/eunit.hrl").

part_test() ->
[<<"ABCD">>] = part(4, [<<"ABCDEF">>]),
Expand Down
10 changes: 5 additions & 5 deletions src/osiris_replica.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@
-include("osiris.hrl").

-define(INFO_(Name, Str, Args),
?INFO("~s [~s:~s/~b] " Str,
?INFO("~ts [~s:~s/~b] " Str,
[Name, ?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY | Args])).

-define(WARN_(Name, Str, Args),
?WARN("~s [~s:~s/~b] " Str,
?WARN("~ts [~s:~s/~b] " Str,
[Name, ?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY | Args])).

-define(ERROR_(Name, Str, Args),
?ERROR("~s [~s:~s/~b] " Str,
?ERROR("~ts [~s:~s/~b] " Str,
[Name, ?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY | Args])).

-define(DEBUG_(Name, Str, Args),
?DEBUG("~s [~s:~s/~b] " Str,
?DEBUG("~ts [~s:~s/~b] " Str,
[Name, ?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY | Args])).
%% osiris replica, starts TCP listener ("server side" of the link),
%% spawns remote reader, TCP listener replicates and
Expand Down Expand Up @@ -486,7 +486,7 @@ handle_info({ssl_closed, Socket},
{stop, normal, State};
handle_info({tcp_error, Socket, Error},
#?MODULE{cfg = #cfg{name = Name, socket = Socket}} = State) ->
?DEBUG_(Name, "osiris_replica: ~s Socket error ~p. Exiting...",
?DEBUG_(Name, "osiris_replica: ~ts Socket error ~p. Exiting...",
[Error]),
{stop, {tcp_error, Error}, State};
handle_info({ssl_error, Socket, Error},
Expand Down
18 changes: 9 additions & 9 deletions src/osiris_replica_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,12 @@ init(#{hosts := Hosts,
connection_token := Token}) ->
process_flag(trap_exit, true),

?DEBUG("~s: trying to connect to replica at ~p", [Name, Hosts]),
?DEBUG("~ts: trying to connect to replica at ~p", [Name, Hosts]),

case maybe_connect(Transport, Hosts, Port, connect_options())
of
{ok, Sock, Host} ->
?DEBUG("~s: successfully connected to host ~p", [Name, Host]),
?DEBUG("~ts: successfully connected to host ~p", [Name, Host]),
CntId = {?MODULE, ExtRef, Host, Port},
CntSpec = {CntId, ?COUNTER_FIELDS},
Config = #{counter_spec => CntSpec, transport => Transport},
Expand All @@ -166,7 +166,7 @@ init(#{hosts := Hosts,
{ok, Log} =
osiris_writer:init_data_reader(LeaderPid, TailInfo, Config),
CntRef = osiris_log:counters_ref(Log),
?INFO("~s: starting osiris replica reader at offset ~b",
?INFO("~ts: starting osiris replica reader at offset ~b",
[Name, osiris_log:next_offset(Log)]),

ok = send(Transport, Sock, Token),
Expand Down Expand Up @@ -267,31 +267,31 @@ handle_info({'DOWN', Ref, _, _, Info},
leader_monitor_ref = Ref} =
State) ->
%% leader is down, exit
?ERROR("osiris_replica_reader: '~s' detected leader down "
?ERROR("osiris_replica_reader: '~ts' detected leader down "
"with ~W - exiting...",
[Name, Info, 10]),
%% this should be enough to make the replica shut down
ok = close(Transport, Sock),
{stop, Info, State};
handle_info({tcp_closed, Socket},
#state{name = Name, socket = Socket} = State) ->
?DEBUG("osiris_replica_reader: '~s' Socket closed. Exiting...",
?DEBUG("osiris_replica_reader: '~ts' Socket closed. Exiting...",
[Name]),
{stop, normal, State};
handle_info({ssl_closed, Socket},
#state{name = Name, socket = Socket} = State) ->
?DEBUG("osiris_replica_reader: '~s' TLS socket closed. Exiting...",
?DEBUG("osiris_replica_reader: '~ts' TLS socket closed. Exiting...",
[Name]),
{stop, normal, State};
handle_info({tcp_error, Socket, Error},
#state{name = Name, socket = Socket} = State) ->
?DEBUG("osiris_replica_reader: '~s' Socket error ~p. "
?DEBUG("osiris_replica_reader: '~ts' Socket error ~p. "
"Exiting...",
[Name, Error]),
{stop, {tcp_error, Error}, State};
handle_info({ssl_error, Socket, Error},
#state{name = Name, socket = Socket} = State) ->
?DEBUG("osiris_replica_reader: '~s' TLS socket error ~p. "
?DEBUG("osiris_replica_reader: '~ts' TLS socket error ~p. "
"Exiting...",
[Name, Error]),
{stop, {ssl_error, Error}, State};
Expand All @@ -301,7 +301,7 @@ handle_info({'EXIT', Ref, Info}, State) ->
[Ref, Info]),
{stop, normal, State};
handle_info(Info, #state{name = Name} = State) ->
?DEBUG("osiris_replica_reader: '~s' unhandled message ~W",
?DEBUG("osiris_replica_reader: '~ts' unhandled message ~W",
[Name, Info, 10]),
{noreply, State}.

Expand Down
4 changes: 2 additions & 2 deletions src/osiris_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ replication_over_tls_configuration(InitArgs, FileConsultFun, LogFun) ->
{error, Error} ->
LogFun(warn,
"Error while reading TLS "
++ "distributon option file ~s: ~p",
++ "distributon option file ~ts: ~p",
[OptFile, Error]),
LogFun(warn,
"Stream replication over TLS will NOT be enabled",
Expand All @@ -148,7 +148,7 @@ replication_over_tls_configuration(InitArgs, FileConsultFun, LogFun) ->
R ->
LogFun(warn,
"Unexpected result while reading TLS distributon "
"option file ~s: ~p",
"option file ~ts: ~p",
[OptFile, R]),
LogFun(warn,
"Stream replication over TLS will NOT be enabled",
Expand Down
4 changes: 2 additions & 2 deletions src/osiris_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ handle_continue(#{name := Name0,
counters:put(CntRef, ?C_COMMITTED_OFFSET, CommittedOffset),
counters:put(CntRef, ?C_EPOCH, Epoch),
EvtFmt = maps:get(event_formatter, Config, undefined),
?INFO("osiris_writer:init/1: name: ~s last offset: ~b "
?INFO("osiris_writer:init/1: name: ~ts last offset: ~b "
"committed chunk id: ~b epoch: ~b",
[Name, LastOffs, CommittedOffset, Epoch]),
Shared = osiris_log:get_shared(Log),
Expand Down Expand Up @@ -286,7 +286,7 @@ terminate(Reason,
#?MODULE{log = Log,
data_listeners = Listeners,
cfg = #cfg{name = Name}}) ->
?INFO("osiris_writer:terminate/2: name ~s reason: ~w",
?INFO("osiris_writer:terminate/2: name ~ts reason: ~w",
[Name, Reason]),
_ = ets:delete(osiris_reader_context_cache, self()),
ok = osiris_log:close(Log),
Expand Down
Loading