Skip to content

Ra systems #2909

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions deps/rabbit/src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@
%% 100 ms
-define(BOOT_STATUS_CHECK_INTERVAL, 100).

-define(COORD_WAL_MAX_SIZE_B, 64_000_000).

%%----------------------------------------------------------------------------

-type restart_type() :: 'permanent' | 'transient' | 'temporary'.
Expand Down Expand Up @@ -367,6 +369,23 @@ run_prelaunch_second_phase() ->
?LOG_DEBUG(""),
?LOG_DEBUG("== Prelaunch DONE =="),

?LOG_DEBUG("Starting Ra Systems"),
Default = ra_system:default_config(),
Quorum = Default#{name => quorum_queues},
% names => ra_system:derive_names(quorum)},
CoordDataDir = filename:join([rabbit_mnesia:dir(), "coordination", node()]),
Coord = Default#{name => coordination,
data_dir => CoordDataDir,
wal_data_dir => CoordDataDir,
wal_max_size_bytes => ?COORD_WAL_MAX_SIZE_B,
names => ra_system:derive_names(coordination)},

{ok, _} = ra_system:start(Quorum),
{ok, _} = ra_system:start(Coord),

?LOG_DEBUG(""),
?LOG_DEBUG("== Ra System Start done DONE =="),

case IsInitialPass of
true -> rabbit_prelaunch:initial_pass_finished();
false -> ok
Expand Down
8 changes: 4 additions & 4 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -775,11 +775,11 @@ init_aux(Name) when is_atom(Name) ->
#aux{name = Name,
capacity = {inactive, Now, 1, 1.0}}.

handle_aux(leader, _, garbage_collection, State, Log, _MacState) ->
ra_log_wal:force_roll_over(ra_log_wal),
{no_reply, State, Log};
handle_aux(leader, _, garbage_collection, State, Log, MacState) ->
% ra_log_wal:force_roll_over(ra_log_wal),
{no_reply, force_eval_gc(Log, MacState, State), Log};
handle_aux(follower, _, garbage_collection, State, Log, MacState) ->
ra_log_wal:force_roll_over(ra_log_wal),
% ra_log_wal:force_roll_over(ra_log_wal),
{no_reply, force_eval_gc(Log, MacState, State), Log};
handle_aux(_RaState, cast, eval, Aux0, Log, _MacState) ->
{no_reply, Aux0, Log};
Expand Down
6 changes: 3 additions & 3 deletions deps/rabbit/src/rabbit_maintenance.erl
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ transfer_leadership_of_quorum_queues(_TransferCandidates) ->
%% by simply shutting its local QQ replica (Ra server)
RaLeader = amqqueue:get_pid(Q),
rabbit_log:debug("Will stop Ra server ~p", [RaLeader]),
case ra:stop_server(RaLeader) of
case rabbit_quorum_queue:stop_server(RaLeader) of
ok ->
rabbit_log:debug("Successfully stopped Ra server ~p", [RaLeader]);
{error, nodedown} ->
Expand Down Expand Up @@ -296,7 +296,7 @@ stop_local_quorum_queue_followers() ->
{RegisteredName, _LeaderNode} = amqqueue:get_pid(Q),
RaNode = {RegisteredName, node()},
rabbit_log:debug("Will stop Ra server ~p", [RaNode]),
case ra:stop_server(RaNode) of
case rabbit_quorum_queue:stop_server(RaNode) of
ok ->
rabbit_log:debug("Successfully stopped Ra server ~p", [RaNode]);
{error, nodedown} ->
Expand Down Expand Up @@ -339,7 +339,7 @@ revive_local_quorum_queue_replicas() ->
{Prefix, _Node} = amqqueue:get_pid(Q),
RaServer = {Prefix, node()},
rabbit_log:debug("Will start Ra server ~p", [RaServer]),
case ra:restart_server(RaServer) of
case rabbit_quorum_queue:restart_server(RaServer) of
ok ->
rabbit_log:debug("Successfully restarted Ra server ~p", [RaServer]);
{error, {already_started, _Pid}} ->
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_quorum_memory_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.

force_roll_over(State) ->
ra_log_wal:force_roll_over(ra_log_wal),
rabbit_quorum_queue:wal_force_roll_over(node()),
State#state{last_roll_over = erlang:system_time(millisecond)}.

interval() ->
Expand Down
65 changes: 48 additions & 17 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@
close/1,
update/2,
handle_event/2]).
-export([is_recoverable/1, recover/2, stop/1, delete/4, delete_immediately/2]).
-export([is_recoverable/1,
recover/2,
stop/1,
start_server/1,
restart_server/1,
stop_server/1,
delete/4,
delete_immediately/2]).
-export([state_info/1, info/2, stat/1, infos/1]).
-export([settle/4, dequeue/4, consume/3, cancel/5]).
-export([credit/4]).
Expand All @@ -38,16 +45,20 @@
-export([shrink_all/1,
grow/4]).
-export([transfer_leadership/2, get_replicas/1, queue_length/1]).
-export([file_handle_leader_reservation/1, file_handle_other_reservation/0]).
-export([file_handle_leader_reservation/1,
file_handle_other_reservation/0]).
-export([file_handle_release_reservation/0]).
-export([list_with_minimum_quorum/0, list_with_minimum_quorum_for_cli/0,
filter_quorum_critical/1, filter_quorum_critical/2,
-export([list_with_minimum_quorum/0,
list_with_minimum_quorum_for_cli/0,
filter_quorum_critical/1,
filter_quorum_critical/2,
all_replica_states/0]).
-export([capabilities/0]).
-export([repair_amqqueue_nodes/1,
repair_amqqueue_nodes/2
]).
-export([reclaim_memory/2]).
-export([reclaim_memory/2,
wal_force_roll_over/1]).
-export([notify_decorators/1,
notify_decorators/3,
spawn_notify_decorators/3]).
Expand All @@ -65,6 +76,9 @@
-type msg_id() :: non_neg_integer().
-type qmsg() :: {rabbit_types:r('queue'), pid(), msg_id(), boolean(), rabbit_types:message()}.

-define(RA_SYSTEM, quorum_queues).
-define(RA_WAL_NAME, ra_log_wal).

-define(STATISTICS_KEYS,
[policy,
operator_policy,
Expand Down Expand Up @@ -168,7 +182,7 @@ start_cluster(Q) ->
TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT),
RaConfs = [make_ra_conf(NewQ, ServerId, TickTimeout)
|| ServerId <- members(NewQ)],
case ra:start_cluster(RaConfs) of
case ra:start_cluster(?RA_SYSTEM, RaConfs) of
{ok, _, _} ->
%% ensure the latest config is evaluated properly
%% even when running the machine version from 0
Expand Down Expand Up @@ -506,7 +520,7 @@ recover(_Vhost, Queues) ->
QName = amqqueue:get_name(Q0),
Nodes = get_nodes(Q0),
Formatter = {?MODULE, format_ra_event, [QName]},
Res = case ra:restart_server({Name, node()},
Res = case ra:restart_server(?RA_SYSTEM, {Name, node()},
#{ra_event_formatter => Formatter}) of
ok ->
% queue was restarted, good
Expand All @@ -518,7 +532,8 @@ recover(_Vhost, Queues) ->
% so needs to be started from scratch.
Machine = ra_machine(Q0),
RaNodes = [{Name, Node} || Node <- Nodes],
case ra:start_server(Name, {Name, node()}, Machine, RaNodes) of
case ra:start_server(?RA_SYSTEM, Name, {Name, node()},
Machine, RaNodes) of
ok -> ok;
Err2 ->
rabbit_log:warning("recover: quorum queue ~w could not"
Expand Down Expand Up @@ -553,10 +568,22 @@ recover(_Vhost, Queues) ->
stop(VHost) ->
_ = [begin
Pid = amqqueue:get_pid(Q),
ra:stop_server(Pid)
ra:stop_server(?RA_SYSTEM, Pid)
end || Q <- find_quorum_queues(VHost)],
ok.

-spec stop_server({atom(), node()}) -> ok | {error, term()}.
stop_server({_, _} = Ref) ->
ra:stop_server(?RA_SYSTEM, Ref).

-spec start_server(map()) -> ok | {error, term()}.
start_server(Conf) when is_map(Conf) ->
ra:start_server(?RA_SYSTEM, Conf).

-spec restart_server({atom(), node()}) -> ok | {error, term()}.
restart_server({_, _} = Ref) ->
ra:restart_server(?RA_SYSTEM, Ref).

-spec delete(amqqueue:amqqueue(),
boolean(), boolean(),
rabbit_types:username()) ->
Expand Down Expand Up @@ -617,7 +644,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->

force_delete_queue(Servers) ->
[begin
case catch(ra:force_delete_server(S)) of
case catch(ra:force_delete_server(?RA_SYSTEM, S)) of
ok -> ok;
Err ->
rabbit_log:warning(
Expand Down Expand Up @@ -877,19 +904,19 @@ cleanup_data_dir() ->
|| Q <- rabbit_amqqueue:list_by_type(?MODULE),
lists:member(node(), get_nodes(Q))],
NoQQClusters = rabbit_ra_registry:list_not_quorum_clusters(),
Registered = ra_directory:list_registered(),
Registered = ra_directory:list_registered(?RA_SYSTEM),
Running = Names ++ NoQQClusters,
_ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered,
not lists:member(Name, Running)],
ok.

maybe_delete_data_dir(UId) ->
Dir = ra_env:server_data_dir(UId),
Dir = ra_env:server_data_dir(?RA_SYSTEM, UId),
{ok, Config} = ra_log:read_config(Dir),
case maps:get(machine, Config) of
{module, rabbit_fifo, _} ->
ra_lib:recursive_delete(Dir),
ra_directory:unregister_name(UId);
ra_directory:unregister_name(?RA_SYSTEM, UId);
_ ->
ok
end.
Expand Down Expand Up @@ -999,7 +1026,7 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
?TICK_TIMEOUT),
Conf = make_ra_conf(Q, ServerId, TickTimeout),
case ra:start_server(Conf) of
case ra:start_server(?RA_SYSTEM, Conf) of
ok ->
case ra:add_member(Members, ServerId, Timeout) of
{ok, _, Leader} ->
Expand All @@ -1014,11 +1041,11 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
fun() -> rabbit_amqqueue:update(QName, Fun) end),
ok;
{timeout, _} ->
_ = ra:force_delete_server(ServerId),
_ = ra:force_delete_server(?RA_SYSTEM, ServerId),
_ = ra:remove_member(Members, ServerId),
{error, timeout};
E ->
_ = ra:force_delete_server(ServerId),
_ = ra:force_delete_server(?RA_SYSTEM, ServerId),
E
end;
E ->
Expand Down Expand Up @@ -1065,7 +1092,7 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
end,
rabbit_misc:execute_mnesia_transaction(
fun() -> rabbit_amqqueue:update(QName, Fun) end),
case ra:force_delete_server(ServerId) of
case ra:force_delete_server(?RA_SYSTEM, ServerId) of
ok ->
ok;
{error, {badrpc, nodedown}} ->
Expand Down Expand Up @@ -1199,6 +1226,10 @@ reclaim_memory(Vhost, QueueName) ->
E
end.

-spec wal_force_roll_over(node()) -> ok.
wal_force_roll_over(Node) ->
ra_log_wal:force_roll_over({?RA_WAL_NAME, Node}).

%%----------------------------------------------------------------------------
dlx_mfa(Q) ->
DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>,
Expand Down
48 changes: 23 additions & 25 deletions deps/rabbit/src/rabbit_stream_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@
start() ->
Nodes = rabbit_mnesia:cluster_nodes(all),
ServerId = {?MODULE, node()},
case ra:restart_server(ServerId) of
case ra:restart_server(?RA_SYSTEM, ServerId) of
{error, Reason} when Reason == not_started orelse
Reason == name_not_registered ->
case ra:start_server(make_ra_conf(node(), Nodes)) of
case ra:start_server(?RA_SYSTEM, make_ra_conf(node(), Nodes)) of
ok ->
global:set_lock(?STREAM_COORDINATOR_STARTUP),
case find_members(Nodes) of
Expand All @@ -116,7 +116,7 @@ start() ->
end.

recover() ->
ra:restart_server({?MODULE, node()}).
ra:restart_server(?RA_SYSTEM, {?MODULE, node()}).

%% new api

Expand Down Expand Up @@ -259,26 +259,24 @@ ensure_coordinator_started() ->
case whereis(?MODULE) of
undefined ->
global:set_lock(?STREAM_COORDINATOR_STARTUP),
Nodes =
case ra:restart_server(Local) of
{error, Reason} when Reason == not_started orelse
Reason == name_not_registered ->
OtherNodes = all_coord_members() -- [Local],
%% We can't use find_members/0 here as a process that timeouts means the cluster is up
case lists:filter(fun(N) -> global:whereis_name(N) =/= undefined end,
OtherNodes) of
[] ->
start_coordinator_cluster();
_ ->
OtherNodes
end;
ok ->
AllNodes;
{error, {already_started, _}} ->
AllNodes;
_ ->
AllNodes
end,
Nodes = case ra:restart_server(?RA_SYSTEM, Local) of
{error, Reason} when Reason == not_started orelse
Reason == name_not_registered ->
OtherNodes = all_coord_members() -- [Local],
%% We can't use find_members/0 here as a process that timeouts means the cluster is up
case lists:filter(fun(N) -> global:whereis_name(N) =/= undefined end, OtherNodes) of
[] ->
start_coordinator_cluster();
_ ->
OtherNodes
end;
ok ->
AllNodes;
{error, {already_started, _}} ->
AllNodes;
_ ->
AllNodes
end,
global:del_lock(?STREAM_COORDINATOR_STARTUP),
Nodes;
_ ->
Expand All @@ -288,7 +286,7 @@ ensure_coordinator_started() ->
start_coordinator_cluster() ->
Nodes = rabbit_mnesia:cluster_nodes(running),
rabbit_log:debug("Starting stream coordinator on nodes: ~w", [Nodes]),
case ra:start_cluster([make_ra_conf(Node, Nodes) || Node <- Nodes]) of
case ra:start_cluster(?RA_SYSTEM, [make_ra_conf(Node, Nodes) || Node <- Nodes]) of
{ok, Started, _} ->
rabbit_log:debug("Started stream coordinator on ~w", [Started]),
Started;
Expand Down Expand Up @@ -488,7 +486,7 @@ add_members(_, []) ->
ok;
add_members(Members, [Node | Nodes]) ->
Conf = make_ra_conf(Node, [N || {_, N} <- Members]),
case ra:start_server(Conf) of
case ra:start_server(?RA_SYSTEM, Conf) of
ok ->
case ra:add_member(Members, {?MODULE, Node}) of
{ok, NewMembers, _} ->
Expand Down
1 change: 1 addition & 0 deletions deps/rabbit/src/rabbit_stream_coordinator.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
-define(RESTART_TIMEOUT, 1000).
-define(PHASE_RETRY_TIMEOUT, 10000).
-define(CMD_TIMEOUT, 30000).
-define(RA_SYSTEM, coordination).

-type stream_id() :: string().
-type stream() :: #{conf := osiris:config(),
Expand Down
Loading