Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions deps/rabbit/src/rabbit_upgrade_preparation.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
%%

-define(SAMPLING_INTERVAL, 200).
-define(LOGGING_FREQUENCY, ?SAMPLING_INTERVAL * 100).

await_online_quorum_plus_one(Timeout) ->
Iterations = ceil(Timeout / ?SAMPLING_INTERVAL),
Expand All @@ -38,7 +39,11 @@ online_members(Component) ->
erlang, whereis, [Component])).

endangered_critical_components() ->
CriticalComponents = [rabbit_stream_coordinator],
CriticalComponents = [rabbit_stream_coordinator] ++
case rabbit_feature_flags:is_enabled(khepri_db) of
true -> [rabbitmq_metadata];
false -> []
end,
Nodes = rabbit_nodes:list_members(),
lists:filter(fun (Component) ->
NumAlive = length(online_members(Component)),
Expand All @@ -65,6 +70,21 @@ do_await_safe_online_quorum(IterationsLeft) ->
case EndangeredQueues =:= [] andalso endangered_critical_components() =:= [] of
true -> true;
false ->
case IterationsLeft rem ?LOGGING_FREQUENCY of
0 ->
case length(EndangeredQueues) of
0 -> ok;
N -> rabbit_log:info("Waiting for ~ts queues and streams to have quorum+1 replicas online."
"You can list them with `rabbitmq-diagnostics check_if_node_is_quorum_critical`", [N])
end,
case endangered_critical_components() of
[] -> ok;
_ -> rabbit_log:info("Waiting for the following critical components to have quorum+1 replicas online: ~p.",
[endangered_critical_components()])
end;
_ ->
ok
end,
timer:sleep(?SAMPLING_INTERVAL),
do_await_safe_online_quorum(IterationsLeft - 1)
end.
Expand All @@ -89,6 +109,6 @@ list_with_minimum_quorum_for_cli() ->
[#{
<<"readable_name">> => C,
<<"name">> => C,
<<"virtual_host">> => "-",
<<"virtual_host">> => <<"(not applicable)">>,
<<"type">> => process
} || C <- endangered_critical_components()].
65 changes: 36 additions & 29 deletions deps/rabbit/test/upgrade_preparation_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,16 @@

all() ->
[
{group, quorum_queue},
{group, stream}
{group, clustered}
].

groups() ->
[
{quorum_queue, [], [
await_quorum_plus_one_qq
]},
{stream, [], [
await_quorum_plus_one_stream
]},
{stream_coordinator, [], [
await_quorum_plus_one_stream_coordinator
{clustered, [], [
await_quorum_plus_one_qq,
await_quorum_plus_one_stream,
await_quorum_plus_one_stream_coordinator,
await_quorum_plus_one_rabbitmq_metadata
]}
].

Expand All @@ -45,31 +41,30 @@ end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).

init_per_group(Group, Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
true ->
%% in a 3.8/3.9 mixed cluster, ra will not cluster across versions,
%% so quorum plus one will not be achieved
{skip, "not mixed versions compatible"};
_ ->
Config1 = rabbit_ct_helpers:set_config(Config,
[
{rmq_nodes_count, 3},
{rmq_nodename_suffix, Group}
]),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps())
end.
Config1 = rabbit_ct_helpers:set_config(Config,
[
{rmq_nodes_count, 3},
{rmq_nodename_suffix, Group}
]),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).

end_per_group(_Group, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).


init_per_testcase(TestCase, Config) ->
rabbit_ct_helpers:testcase_started(Config, TestCase),
Config.
init_per_testcase(Testcase, Config) when Testcase == await_quorum_plus_one_rabbitmq_metadata ->
case rabbit_ct_helpers:is_mixed_versions() of
true ->
{skip, "not mixed versions compatible"};
_ ->
rabbit_ct_helpers:testcase_started(Config, Testcase)
end;
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).

end_per_testcase(TestCase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, TestCase).
Expand Down Expand Up @@ -121,12 +116,24 @@ await_quorum_plus_one_stream_coordinator(Config) ->
%% no queues/streams beyond this point

ok = rabbit_ct_broker_helpers:stop_node(Config, B),
%% this should fail because the corrdinator has only 2 running nodes
%% this should fail because the coordinator has only 2 running nodes
?assertNot(await_quorum_plus_one(Config, 0)),

ok = rabbit_ct_broker_helpers:start_node(Config, B),
?assert(await_quorum_plus_one(Config, 0)).

await_quorum_plus_one_rabbitmq_metadata(Config) ->
Nodes = [A, B, _C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, Nodes, khepri_db),
?assert(await_quorum_plus_one(Config, A)),

ok = rabbit_ct_broker_helpers:stop_node(Config, B),
%% this should fail because rabbitmq_metadata has only 2 running nodes
?assertNot(await_quorum_plus_one(Config, A)),

ok = rabbit_ct_broker_helpers:start_node(Config, B),
?assert(await_quorum_plus_one(Config, A)).

%%
%% Implementation
%%
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ defmodule RabbitMQ.CLI.Streams.Commands.AddReplicaCommand do
to_atom(node)
]) do
{:error, :classic_queue_not_supported} ->
{:error, "Cannot add replicas to a classic queue"}
{:error, "Cannot add replicas to classic queues"}

{:error, :quorum_queue_not_supported} ->
{:error, "Cannot add replicas to a quorum queue"}
{:error, "Cannot add replicas to quorum queues"}

other ->
other
Expand All @@ -37,11 +37,11 @@ defmodule RabbitMQ.CLI.Streams.Commands.AddReplicaCommand do

use RabbitMQ.CLI.DefaultOutput

def usage, do: "add_replica [--vhost <vhost>] <queue> <node>"
def usage, do: "add_replica [--vhost <vhost>] <stream> <node>"

def usage_additional do
[
["<queue>", "stream queue name"],
["<queue>", "stream name"],
["<node>", "node to add a new replica on"]
]
end
Expand All @@ -54,11 +54,11 @@ defmodule RabbitMQ.CLI.Streams.Commands.AddReplicaCommand do

def help_section, do: :replication

def description, do: "Adds a stream queue replica on the given node."
def description, do: "Adds a stream replica on the given node"

def banner([name, node], _) do
[
"Adding a replica for queue #{name} on node #{node}..."
"Adding a replica for stream #{name} on node #{node}..."
]
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,11 @@ is_quorum_critical_test(Config) ->
Body = http_get_failed(Config, "/health/checks/node-is-quorum-critical"),
?assertEqual(<<"failed">>, maps:get(<<"status">>, Body)),
?assertEqual(true, maps:is_key(<<"reason">>, Body)),
[Queue] = maps:get(<<"queues">>, Body),
?assertEqual(QName, maps:get(<<"name">>, Queue)),
Queues = maps:get(<<"queues">>, Body),
?assert(lists:any(
fun(Item) ->
QName =:= maps:get(<<"name">>, Item)
end, Queues)),

passed.

Expand Down