Skip to content

Replace metrics by seshat counters #221

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ ESCRIPT_EMU_ARGS = -noinput -setcookie ra_fifo_cli

dep_gen_batch_server = hex 0.8.4
dep_aten = hex 0.5.6
DEPS = aten gen_batch_server
dep_seshat = git https://github.com/rabbitmq/seshat.git main
DEPS = aten gen_batch_server seshat

TEST_DEPS = proper meck eunit_formatters looking_glass inet_tcp_proxy

Expand Down
2 changes: 1 addition & 1 deletion src/ra.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
{links,[{"github","https://github.com/rabbitmq/ra"}]},
{modules,[]},
{registered,[ra_sup]},
{applications,[kernel,stdlib,sasl,crypto,aten,gen_batch_server]},
{applications,[kernel,stdlib,sasl,crypto,aten,gen_batch_server,seshat]},
{mod,{ra_app,[]}},
{env,[]}]}.
2 changes: 1 addition & 1 deletion src/ra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ overview(System) ->
#{node => node(),
servers => ra_directory:overview(System),
%% TODO:filter counter keys by system
counters => ra_counters:overview(),
counters => seshat_counters:overview(ra),
wal => #{status => lists:nth(5, element(4, sys:get_status(ra_log_wal))),
open_mem_tables => ets:info(OpenTbls, size),
closed_mem_tables => ets:info(ClosedTbls, size)},
Expand Down
74 changes: 36 additions & 38 deletions src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -190,19 +190,6 @@

-define(DEFAULT_SNAPSHOT_MODULE, ra_log_snapshot).

-define(RA_LOG_COUNTER_FIELDS,
[write_ops,
write_resends,
read_ops,
read_cache,
read_open_mem_tbl,
read_closed_mem_tbl,
read_segment,
fetch_term,
snapshots_written,
snapshot_installed,
reserved_1
]).
-define(C_RA_LOG_WRITE_OPS, 1).
-define(C_RA_LOG_WRITE_RESENDS, 2).
-define(C_RA_LOG_READ_OPS, 3).
Expand All @@ -214,6 +201,42 @@
-define(C_RA_LOG_SNAPSHOTS_WRITTEN, 9).
-define(C_RA_LOG_SNAPSHOTS_INSTALLED, 10).
-define(C_RA_LOG_RESERVED, 11).
-define(RA_LOG_COUNTER_FIELDS,
[{write_ops, ?C_RA_LOG_WRITE_OPS, counter, "Total number of write ops"},
{write_resends, ?C_RA_LOG_WRITE_RESENDS, counter, "Total number of write resends"},
{read_ops, ?C_RA_LOG_READ_OPS, counter, "Total number of read ops"},
{read_cache, ?C_RA_LOG_READ_CACHE, counter, "Total number of cache reads"},
{read_open_mem_tbl, ?C_RA_LOG_READ_OPEN_MEM_TBL, counter, "Total number of opened memory tables"},
{read_closed_mem_tbl, ?C_RA_LOG_READ_CLOSED_MEM_TBL, counter, "Total number of closed memory tables"},
{read_segment, ?C_RA_LOG_READ_SEGMENT, counter, "Total number of read segments"},
{fetch_term, ?C_RA_LOG_FETCH_TERM, counter, "Total number of terms fetched"},
{snapshots_written, ?C_RA_LOG_SNAPSHOTS_WRITTEN, counter, "Total number of snapshots written"},
{snapshot_installed, ?C_RA_LOG_SNAPSHOTS_INSTALLED, counter, "Total number of snapshots installed"},
{reserved_1, ?C_RA_LOG_RESERVED, counter, "Reserved counter"}
]).

-define(RA_SRV_COUNTER_FIELDS,
[
{aer_received_follower, ?C_RA_SRV_AER_RECEIVED_FOLLOWER, counter, "Total number of append entries received"},
{aer_replies_success, ?C_RA_SRV_AER_REPLIES_SUCCESS, counter, "Total number of successful append entries"},
{aer_replies_fail, ?C_RA_SRV_AER_REPLIES_FAILED, counter, "Total number of failed append entries"},
{commands, ?C_RA_SRV_COMMANDS, counter, "Total number of commands"},
{command_flushes, ?C_RA_SRV_COMMAND_FLUSHES, counter, "Total number of command batches"},
{aux_commands, ?C_RA_SRV_AUX_COMMANDS, counter, "Total number of aux commands"},
{consistent_queries, ?C_RA_SRV_CONSISTENT_QUERIES, counter, "Total number of consistent queries"},
{rpcs_sent, ?C_RA_SRV_RPCS_SENT, counter, "Total number of rpcs"},
{msgs_sent, ?C_RA_SRV_MSGS_SENT, counter, "All messages sent (exept messages sent to wal)"},
{dropped_sends, ?C_RA_SRV_DROPPED_SENDS, counter, "Total number of message sends that return noconnect or nosuspend are dropped"},
{send_msg_effects_sent, ?C_RA_SRV_SEND_MSG_EFFS_SENT, counter, "Total number of message effects sent"},
{pre_vote_elections, ?C_RA_SRV_PRE_VOTE_ELECTIONS, counter, "Total number of pre-vote elections"},
{elections, ?C_RA_SRV_ELECTIONS, counter, "Total number of elections"},
{forced_gcs, ?C_RA_SRV_GCS, counter, "Number of garbage collection runs"},
{snapshots_sent, ?C_RA_SRV_SNAPSHOTS_SENT, counter, "Total number of snapshots sent"},
{release_cursors, ?C_RA_SRV_RELEASE_CURSORS, counter, "Total number of updates of the release cursor"},
{aer_received_follower_empty, ?C_RA_SRV_AER_RECEIVED_FOLLOWER_EMPTY, counter, "Total number of empty append entries received"},
{term_and_voted_for_updates, ?C_RA_SRV_TERM_AND_VOTED_FOR_UPDATES, counter, "Total number of updates of term and voted for"},
{local_queries, ?C_RA_SRV_LOCAL_QUERIES, counter, "Total number of local queries"}
]).

-define(C_RA_SRV_AER_RECEIVED_FOLLOWER, ?C_RA_LOG_RESERVED + 1).
-define(C_RA_SRV_AER_REPLIES_SUCCESS, ?C_RA_LOG_RESERVED + 2).
Expand All @@ -235,29 +258,4 @@
-define(C_RA_SRV_TERM_AND_VOTED_FOR_UPDATES, ?C_RA_LOG_RESERVED + 18).
-define(C_RA_SRV_LOCAL_QUERIES, ?C_RA_LOG_RESERVED + 19).


-define(RA_SRV_COUNTER_FIELDS,
[
aer_received_follower,
aer_replies_success,
aer_replies_fail,
commands,
command_flushes,
aux_commands,
consistent_queries,
rpcs_sent,
msgs_sent, %% all messages sent (exept messages sent to wal)
dropped_sends, %% any message sends that return noconnect or nosuspend are dropped
send_msg_effects_sent,
pre_vote_elections,
elections,
forced_gcs,
snapshots_sent,
release_cursors,
aer_received_follower_empty,
term_and_voted_for_updates,
local_queries

]).

-define(RA_COUNTER_FIELDS, ?RA_LOG_COUNTER_FIELDS ++ ?RA_SRV_COUNTER_FIELDS).
2 changes: 1 addition & 1 deletion src/ra_bench.erl
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ spawn_client(Parent, Leader, Num, DataSize) when Num >= ?PIPE_SIZE ->
print_metrics(Name) ->
io:format("Node ~w:", [node()]),
io:format("metrics ~p~n", [ets:lookup(ra_metrics, Name)]),
io:format("counters ~p", [ra_counters:overview()]).
io:format("counters ~p", [seshat_counters:overview(ra)]).



Expand Down
87 changes: 0 additions & 87 deletions src/ra_counters.erl

This file was deleted.

15 changes: 8 additions & 7 deletions src/ra_log_segment_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,16 @@

-define(AWAIT_TIMEOUT, 30000).

-define(COUNTER_FIELDS,
[mem_tables,
segments,
entries
]).

-define(C_MEM_TABLES, 1).
-define(C_SEGMENTS, 2).
-define(C_ENTRIES, 3).
-define(COUNTER_FIELDS,
[
{mem_tables, ?C_MEM_TABLES, counter, "Number of in-memory tables"},
{segments, ?C_SEGMENTS, counter, "Number of segments"},
{entries, ?C_ENTRIES, counter, "Number of entries"}
]
).

%%% ra_log_segment_writer
%%% receives a set of closed mem_segments from the wal
Expand Down Expand Up @@ -97,7 +98,7 @@ await(SegWriter) ->
init([#{data_dir := DataDir,
system := System} = Conf]) ->
process_flag(trap_exit, true),
CRef = ra_counters:new(?MODULE, ?COUNTER_FIELDS),
CRef = seshat_counters:new(ra, ?MODULE, ?COUNTER_FIELDS),
SegmentConf = maps:get(segment_conf, Conf, #{}),
{ok, #state{system = System,
data_dir = DataDir,
Expand Down
14 changes: 7 additions & 7 deletions src/ra_log_wal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@
-define(MAGIC, "RAWA").
-define(HEADER_SIZE, 5).

-define(COUNTER_FIELDS,
[wal_files,
batches,
writes
]).

-define(C_WAL_FILES, 1).
-define(C_BATCHES, 2).
-define(C_WRITES, 3).
-define(COUNTER_FIELDS,
[{wal_files, ?C_WAL_FILES, counter, "Number of write-ahead log files"},
{batches, ?C_BATCHES, counter, "Number of batches"},
{writes, ?C_WRITES, counter, "Number of writes"}
]).

% a writer_id consists of a unqique local name (see ra_directory) and a writer's
% current pid().
% The pid is used for the immediate writer notification
Expand Down Expand Up @@ -235,7 +235,7 @@ init(#{dir := Dir} = Conf0) ->
% at times receive large number of messages from a large number of
% writers
process_flag(message_queue_data, off_heap),
CRef = ra_counters:new(WalName, ?COUNTER_FIELDS),
CRef = seshat_counters:new(ra, WalName, ?COUNTER_FIELDS),
% wait for the segment writer to process anything in flight
ok = ra_log_segment_writer:await(SegWriter),
%% TODO: recover wal should return {stop, Reason} if it fails
Expand Down
2 changes: 1 addition & 1 deletion src/ra_metrics_ets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ init([]) ->
{write_concurrency, true},
public],
_ = ets:new(ra_log_metrics, [set | TableFlags]),
_ = ra_counters:init(),
seshat_counters:new_group(ra),
_ = ra_leaderboard:init(),

%% Table for ra processes to record their current snapshot index so that
Expand Down
4 changes: 2 additions & 2 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ terminate(Reason, StateName,
end,
_ = ets:delete(ra_metrics, MetricsKey),
_ = ets:delete(ra_state, Key),
ok = ra_counters:delete({Key, self()}),
ok = seshat_counters:delete(ra, {Key, self()}),
ok.

code_change(_OldVsn, StateName, State, _Extra) ->
Expand Down Expand Up @@ -1367,7 +1367,7 @@ config_defaults(RegName) ->
tick_timeout => ?TICK_INTERVAL_MS,
await_condition_timeout => ?DEFAULT_AWAIT_CONDITION_TIMEOUT,
initial_members => [],
counter => ra_counters:new({RegName, self()}, ?RA_COUNTER_FIELDS),
counter => seshat_counters:new(ra, {RegName, self()}, ?RA_COUNTER_FIELDS),
system_config => ra_system:default_config()
}.

Expand Down
28 changes: 14 additions & 14 deletions test/ra_log_2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ receive_segment(Config) ->
ok.

read_one(Config) ->
ra_counters:new(?FUNCTION_NAME, ?RA_COUNTER_FIELDS),
Log0 = ra_log_init(Config, #{counter => ra_counters:fetch(?FUNCTION_NAME)}),
seshat_counters:new(ra, ?FUNCTION_NAME, ?RA_COUNTER_FIELDS),
Log0 = ra_log_init(Config, #{counter => seshat_counters:fetch(ra, ?FUNCTION_NAME)}),
Log1 = append_n(1, 2, 1, Log0),
% Log1 = ra_log:append({1, 1, <<1:64/integer>>}, Log0),
% ensure the written event is delivered
Expand All @@ -153,7 +153,7 @@ read_one(Config) ->
#{?FUNCTION_NAME := #{read_cache := M1,
read_open_mem_tbl := M2,
read_closed_mem_tbl := M3,
read_segment := M4}} = ra_counters:overview(),
read_segment := M4}} = seshat_counters:overview(ra),
% read two entries
?assertEqual(1, M1 + M2 + M3 + M4),
ra_log:close(Log),
Expand All @@ -176,9 +176,10 @@ take_after_overwrite_and_init(Config) ->


validate_sequential_reads(Config) ->
ra_counters:new(?FUNCTION_NAME, ?RA_COUNTER_FIELDS),
Log0 = ra_log_init(Config, #{counter => ra_counters:fetch(?FUNCTION_NAME),
max_open_segments => 100}),
seshat_counters:new(ra, ?FUNCTION_NAME, ?RA_COUNTER_FIELDS),
Log0 = ra_log_init(Config,
#{counter => seshat_counters:fetch(ra, ?FUNCTION_NAME),
max_open_segments => 100}),
% write a few entries
Log1 = append_and_roll(1, 500, 1, Log0),
Log2 = append_and_roll(500, 1001, 1, Log1),
Expand All @@ -196,7 +197,7 @@ validate_sequential_reads(Config) ->
#{?FUNCTION_NAME := #{read_cache := M1,
read_open_mem_tbl := M2,
read_closed_mem_tbl := M3,
read_segment := M4} = O} = ra_counters:overview(),
read_segment := M4} = O} = seshat_counters:overview(ra),
?assertEqual(1000, M1 + M2 + M3 + M4),

ct:pal("validate_sequential_reads COLD took ~pms Reductions: ~p~nMetrics: ~p",
Expand Down Expand Up @@ -227,9 +228,8 @@ validate_sequential_reads(Config) ->
ok.

validate_reads_for_overlapped_writes(Config) ->
ra_counters:new(?FUNCTION_NAME, ?RA_COUNTER_FIELDS),
Log0 = ra_log_init(Config, #{counter => ra_counters:fetch(?FUNCTION_NAME)
}),
seshat_counters:new(ra, ?FUNCTION_NAME, ?RA_COUNTER_FIELDS),
Log0 = ra_log_init(Config, #{counter => seshat_counters:fetch(ra, ?FUNCTION_NAME)}),
% write a segment and roll 1 - 299 - term 1
Log1 = write_and_roll(1, 300, 1, Log0),
% write 300 - 399 in term 1 - no roll
Expand All @@ -247,7 +247,7 @@ validate_reads_for_overlapped_writes(Config) ->
#{?FUNCTION_NAME := #{read_cache := M1,
read_open_mem_tbl := M2,
read_closed_mem_tbl := M3,
read_segment := M4}} = ra_counters:overview(),
read_segment := M4}} = seshat_counters:overview(ra),
?assertEqual(550, M1 + M2 + M3 + M4),
ra_log:close(Log8),
ok.
Expand Down Expand Up @@ -318,9 +318,9 @@ written_event_after_snapshot(Config) ->
ok.

updated_segment_can_be_read(Config) ->
ra_counters:new(?FUNCTION_NAME, ?RA_COUNTER_FIELDS),
seshat_counters:new(ra, ?FUNCTION_NAME, ?RA_COUNTER_FIELDS),
Log0 = ra_log_init(Config,
#{counter => ra_counters:fetch(?FUNCTION_NAME),
#{counter => seshat_counters:fetch(ra, ?FUNCTION_NAME),
snapshot_interval => 1}),
%% append a few entrie
Log2 = append_and_roll(1, 5, 1, Log0),
Expand All @@ -336,7 +336,7 @@ updated_segment_can_be_read(Config) ->
?assertEqual(length(Entries1), C1),
ct:pal("Entries: ~p", [Entries]),
ct:pal("Entries1: ~p", [Entries1]),
ct:pal("Counters ~p", [ra_counters:overview(?FUNCTION_NAME)]),
ct:pal("Counters ~p", [seshat_counters:overview(ra)]),
ct:pal("closed ~p", [ets:tab2list(ra_log_closed_mem_tables)]),
?assertEqual(15, length(Entries1)),
% l18 = length(Entries1),
Expand Down
Loading