Skip to content

Commit

Permalink
rabbit_db_cluster: New module on top of databases clustering
Browse files Browse the repository at this point in the history
This new module sits on top of `rabbit_mnesia` and provide an API with
all cluster-related functions.

`rabbit_mnesia` should be called directly inside Mnesia-specific code
only, `rabbit_mnesia_rename` or classic mirrored queues for instance.
Otherwise, `rabbit_db_cluster` must be used.

Several modules, in particular in `rabbitmq_cli`, continue to call
`rabbit_mnesia` as a fallback option if the `rabbit_db_cluster` module
unavailable. This will be the case when the CLI will interact with an
older RabbitMQ version.

This will help with the introduction of a new database backend.
  • Loading branch information
dumbbell committed Feb 22, 2023
1 parent a826b57 commit 42bcd94
Show file tree
Hide file tree
Showing 19 changed files with 274 additions and 36 deletions.
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,7 @@ prep_stop(State) ->

stop(State) ->
ok = rabbit_alarm:stop(),
ok = case rabbit_mnesia:is_clustered() of
ok = case rabbit_db_cluster:is_clustered() of
true -> ok;
false -> rabbit_table:clear_ram_only_tables()
end,
Expand Down
49 changes: 47 additions & 2 deletions deps/rabbit/src/rabbit_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
-include_lib("rabbit_common/include/logging.hrl").

-export([init/0,
reset/0,
force_reset/0,
force_load_on_next_boot/0,
is_virgin_node/0, is_virgin_node/1,
dir/0,
ensure_dir_exists/0]).
Expand Down Expand Up @@ -40,7 +43,7 @@ init() ->
"DB: this node is virgin: ~ts", [IsVirgin],
#{domain => ?RMQLOG_DOMAIN_DB}),
ensure_dir_exists(),
case init_mnesia() of
case init_using_mnesia() of
ok ->
?LOG_DEBUG(
"DB: initialization successeful",
Expand All @@ -53,14 +56,56 @@ init() ->
Error
end.

init_mnesia() ->
init_using_mnesia() ->
?LOG_DEBUG(
"DB: initialize Mnesia",
#{domain => ?RMQLOG_DOMAIN_DB}),
ok = rabbit_mnesia:init(),
?assertEqual(rabbit:data_dir(), mnesia_dir()),
rabbit_sup:start_child(mnesia_sync).

-spec reset() -> Ret when
Ret :: ok.
%% @doc Resets the database and the node.

reset() ->
reset_using_mnesia().

reset_using_mnesia() ->
?LOG_DEBUG(
"DB: resetting node",
#{domain => ?RMQLOG_DOMAIN_DB}),
rabbit_mnesia:reset().

-spec force_reset() -> Ret when
Ret :: ok.
%% @doc Resets the database and the node.

force_reset() ->
force_reset_using_mnesia().

force_reset_using_mnesia() ->
?LOG_DEBUG(
"DB: resetting node forcefully",
#{domain => ?RMQLOG_DOMAIN_DB}),
rabbit_mnesia:force_reset().

-spec force_load_on_next_boot() -> Ret when
Ret :: ok.
%% @doc Requests that the database to be forcefully loaded during next boot.
%%
%% This is necessary when a node refuses to boot when the cluster is in a bad
%% state, like if critical members are MIA.

force_load_on_next_boot() ->
force_load_on_next_boot_using_mnesia().

force_load_on_next_boot_using_mnesia() ->
?LOG_DEBUG(
"DB: resetting node forcefully",
#{domain => ?RMQLOG_DOMAIN_DB}),
rabbit_mnesia:force_load_next_boot().

-spec is_virgin_node() -> IsVirgin when
IsVirgin :: boolean().
%% @doc Indicates if this RabbitMQ node is virgin.
Expand Down
150 changes: 150 additions & 0 deletions deps/rabbit/src/rabbit_db_cluster.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
%%

-module(rabbit_db_cluster).

-include_lib("kernel/include/logger.hrl").
-include_lib("stdlib/include/assert.hrl").

-include_lib("rabbit_common/include/logging.hrl").

-export([join/2,
forget_member/2]).
-export([change_node_type/1]).
-export([is_clustered/0,
members/0,
disc_members/0,
node_type/0,
check_consistency/0,
cli_cluster_status/0]).

-type node_type() :: disc_node_type() | ram_node_type().
-type disc_node_type() :: disc.
-type ram_node_type() :: ram.

-export_type([node_type/0, disc_node_type/0, ram_node_type/0]).

-define(
IS_NODE_TYPE(NodeType),
((NodeType) =:= disc orelse (NodeType) =:= ram)).

%% -------------------------------------------------------------------
%% Cluster formation.
%% -------------------------------------------------------------------

-spec join(RemoteNode, NodeType) -> Ret when
RemoteNode :: node(),
NodeType :: rabbit_db_cluster:node_type(),
Ret :: Ok | Error,
Ok :: ok | {ok, already_member},
Error :: {error, {inconsistent_cluster, string()}}.
%% @doc Adds this node to a cluster using `RemoteNode' to reach it.

join(RemoteNode, NodeType)
when is_atom(RemoteNode) andalso ?IS_NODE_TYPE(NodeType) ->
?LOG_DEBUG(
"DB: joining cluster using remote node `~ts`", [RemoteNode],
#{domain => ?RMQLOG_DOMAIN_DB}),
join_using_mnesia(RemoteNode, NodeType).

join_using_mnesia(RemoteNode, NodeType) ->
rabbit_mnesia:join_cluster(RemoteNode, NodeType).

-spec forget_member(Node, RemoveWhenOffline) -> ok when
Node :: node(),
RemoveWhenOffline :: boolean().
%% @doc Removes `Node' from the cluster.

forget_member(Node, RemoveWhenOffline) ->
forget_member_using_mnesia(Node, RemoveWhenOffline).

forget_member_using_mnesia(Node, RemoveWhenOffline) ->
rabbit_mnesia:forget_cluster_node(Node, RemoveWhenOffline).

%% -------------------------------------------------------------------
%% Cluster update.
%% -------------------------------------------------------------------

-spec change_node_type(NodeType) -> ok when
NodeType :: rabbit_db_cluster:node_type().
%% @doc Changes the node type to `NodeType'.
%%
%% Node types may not all be valid with all databases.

change_node_type(NodeType) ->
change_node_type_using_mnesia(NodeType).

change_node_type_using_mnesia(NodeType) ->
rabbit_mnesia:change_cluster_node_type(NodeType).

%% -------------------------------------------------------------------
%% Cluster status.
%% -------------------------------------------------------------------

-spec is_clustered() -> IsClustered when
IsClustered :: boolean().
%% @doc Indicates if this node is clustered with other nodes or not.

is_clustered() ->
is_clustered_using_mnesia().

is_clustered_using_mnesia() ->
rabbit_mnesia:is_clustered().

-spec members() -> Members when
Members :: [node()].
%% @doc Returns the list of cluster members.

members() ->
members_using_mnesia().

members_using_mnesia() ->
mnesia:system_info(db_nodes).

-spec disc_members() -> Members when
Members :: [node()].
%% @private

disc_members() ->
disc_members_using_mnesia().

disc_members_using_mnesia() ->
rabbit_mnesia:cluster_nodes(disc).

-spec node_type() -> NodeType when
NodeType :: rabbit_db_cluster:node_type().
%% @doc Returns the type of this node, `disc' or `ram'.
%%
%% Node types may not all be relevant with all databases.

node_type() ->
node_type_using_mnesia().

node_type_using_mnesia() ->
rabbit_mnesia:node_type().

-spec check_consistency() -> ok.
%% @doc Ensures the cluster is consistent.

check_consistency() ->
check_consistency_using_mnesia().

check_consistency_using_mnesia() ->
rabbit_mnesia:check_cluster_consistency().

-spec cli_cluster_status() -> ClusterStatus when
ClusterStatus :: [{nodes, [{rabbit_db_cluster:node_type(), [node()]}]} |
{running_nodes, [node()]} |
{partitions, [{node(), [node()]}]}].
%% @doc Returns information from the cluster for the `cluster_status' CLI
%% command.

cli_cluster_status() ->
cli_cluster_status_using_mnesia().

cli_cluster_status_using_mnesia() ->
rabbit_mnesia:status().
17 changes: 8 additions & 9 deletions deps/rabbit/src/rabbit_mnesia.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@

%%----------------------------------------------------------------------------

-export_type([node_type/0, cluster_status/0]).
-export_type([cluster_status/0]).

-type node_type() :: disc | ram.
-type cluster_status() :: {[node()], [node()], [node()]}.

%%----------------------------------------------------------------------------
Expand Down Expand Up @@ -137,12 +136,12 @@ init_with_lock(Retries, Timeout, RunPeerDiscovery) ->
init_with_lock(Retries - 1, Timeout, RunPeerDiscovery)
end.

-spec run_peer_discovery() -> ok | {[node()], node_type()}.
-spec run_peer_discovery() -> ok | {[node()], rabbit_db_cluster:node_type()}.
run_peer_discovery() ->
{RetriesLeft, DelayInterval} = rabbit_peer_discovery:discovery_retries(),
run_peer_discovery_with_retries(RetriesLeft, DelayInterval).

-spec run_peer_discovery_with_retries(non_neg_integer(), non_neg_integer()) -> ok | {[node()], node_type()}.
-spec run_peer_discovery_with_retries(non_neg_integer(), non_neg_integer()) -> ok | {[node()], rabbit_db_cluster:node_type()}.
run_peer_discovery_with_retries(0, _DelayInterval) ->
ok;
run_peer_discovery_with_retries(RetriesLeft, DelayInterval) ->
Expand Down Expand Up @@ -228,7 +227,7 @@ join_discovered_peers_with_retries(TryNodes, NodeType, RetriesLeft, DelayInterva
%% all in the same cluster, we simply pick the first online node and
%% we cluster to its cluster.

-spec join_cluster(node(), node_type())
-spec join_cluster(node(), rabbit_db_cluster:node_type())
-> ok | {ok, already_member} | {error, {inconsistent_cluster, string()}}.

join_cluster(DiscoveryNode, NodeType) ->
Expand Down Expand Up @@ -317,7 +316,7 @@ wipe() ->
ok = rabbit_node_monitor:reset_cluster_status(),
ok.

-spec change_cluster_node_type(node_type()) -> 'ok'.
-spec change_cluster_node_type(rabbit_db_cluster:node_type()) -> 'ok'.

change_cluster_node_type(Type) ->
ensure_mnesia_not_running(),
Expand Down Expand Up @@ -421,7 +420,7 @@ remove_node_offline_node(Node) ->
%% Queries
%%----------------------------------------------------------------------------

-spec status() -> [{'nodes', [{node_type(), [node()]}]} |
-spec status() -> [{'nodes', [{rabbit_db_cluster:node_type(), [node()]}]} |
{'running_nodes', [node()]} |
{'partitions', [{node(), [node()]}]}].

Expand Down Expand Up @@ -539,7 +538,7 @@ node_info() ->
mnesia:system_info(protocol_version),
cluster_status_from_mnesia()}.

-spec node_type() -> node_type().
-spec node_type() -> rabbit_db_cluster:node_type().

node_type() ->
{_AllNodes, DiscNodes, _RunningNodes} =
Expand Down Expand Up @@ -607,7 +606,7 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) ->
rabbit_node_monitor:update_cluster_status(),
ok.

-spec init_db_unchecked([node()], node_type()) -> 'ok'.
-spec init_db_unchecked([node()], rabbit_db_cluster:node_type()) -> 'ok'.

init_db_unchecked(ClusterNodes, NodeType) ->
init_db(ClusterNodes, NodeType, false).
Expand Down
6 changes: 3 additions & 3 deletions deps/rabbit/src/rabbit_node_monitor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ notify_joined_cluster() ->
NewMember = node(),
Nodes = rabbit_nodes:list_running() -- [NewMember],
gen_server:abcast(Nodes, ?SERVER,
{joined_cluster, node(), rabbit_mnesia:node_type()}),
{joined_cluster, node(), rabbit_db_cluster:node_type()}),

ok.

Expand Down Expand Up @@ -415,9 +415,9 @@ handle_call(_Request, _From, State) ->
handle_cast(notify_node_up, State = #state{guid = GUID}) ->
Nodes = rabbit_nodes:list_running() -- [node()],
gen_server:abcast(Nodes, ?SERVER,
{node_up, node(), rabbit_mnesia:node_type(), GUID}),
{node_up, node(), rabbit_db_cluster:node_type(), GUID}),
%% register other active rabbits with this rabbit
DiskNodes = rabbit_mnesia:cluster_nodes(disc),
DiskNodes = rabbit_db_cluster:disc_members(),
[gen_server:cast(?SERVER, {node_up, N, case lists:member(N, DiskNodes) of
true -> disc;
false -> ram
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_nodes.erl
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ is_member(Node) when is_atom(Node) ->
%% @see filter_members/1.

list_members() ->
mnesia:system_info(db_nodes).
rabbit_db_cluster:members().

-spec filter_members(Nodes) -> Nodes when
Nodes :: [node()].
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_prelaunch_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ setup(Context) ->
?LOG_DEBUG(
"Checking cluster consistency", [],
#{domain => ?RMQLOG_DOMAIN_PRELAUNCH}),
rabbit_mnesia:check_cluster_consistency(),
rabbit_db_cluster:check_consistency(),
ok.
2 changes: 1 addition & 1 deletion deps/rabbit/test/clustering_management_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ wait_for_cluster_status(N, Max, Status, AllNodes, Nodes) ->

verify_status_equal(Node, Status, AllNodes) ->
NodeStatus = sort_cluster_status(cluster_status(Node)),
(AllNodes =/= [Node]) =:= rpc:call(Node, rabbit_mnesia, is_clustered, [])
(AllNodes =/= [Node]) =:= rpc:call(Node, rabbit_db_cluster, is_clustered, [])
andalso NodeStatus =:= Status.

cluster_status(Node) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ChangeClusterNodeTypeCommand do

def run([node_type_arg], %{node: node_name}) do
normalized_type = normalize_type(String.to_atom(node_type_arg))
current_type = :rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :node_type, [])
current_type = :rabbit_misc.rpc_call(node_name, :rabbit_db_cluster, :node_type, [])

case current_type do
^normalized_type ->
{:ok, "Node type is already #{normalized_type}"}

_ ->
:rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :change_cluster_node_type, [
:rabbit_misc.rpc_call(node_name, :rabbit_db_cluster, :change_node_type, [
normalized_type
])
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,19 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ClusterStatusCommand do
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning

def run([], %{node: node_name, timeout: timeout}) do
case :rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :status, []) do
status =
case :rabbit_misc.rpc_call(node_name, :rabbit_db_cluster, :cli_cluster_status, []) do
{:badrpc, {:EXIT, {:undef, _}}} ->
:rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :status, [])

{:badrpc, _} = err ->
err

status ->
status
end

case status do
{:badrpc, _} = err ->
err

Expand Down
Loading

0 comments on commit 42bcd94

Please sign in to comment.