Skip to content

QQ grow to a target quorum cluster size #13873

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
97 changes: 82 additions & 15 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1528,25 +1528,14 @@ shrink_all(Node) ->
grow(Node, VhostSpec, QueueSpec, Strategy) ->
grow(Node, VhostSpec, QueueSpec, Strategy, promotable).

-spec grow(node(), binary(), binary(), all | even, membership()) ->
-spec grow(node() | integer(), binary(), binary(), all | even, membership()) ->
[{rabbit_amqqueue:name(),
{ok, pos_integer()} | {error, pos_integer(), term()}}].
grow(Node, VhostSpec, QueueSpec, Strategy, Membership) ->
grow(Node, VhostSpec, QueueSpec, Strategy, Membership) when is_atom(Node) ->
Running = rabbit_nodes:list_running(),
[begin
Size = length(get_nodes(Q)),
QName = amqqueue:get_name(Q),
rabbit_log:info("~ts: adding a new member (replica) on node ~w",
[rabbit_misc:rs(QName), Node]),
case add_member(Q, Node, Membership) of
ok ->
{QName, {ok, Size + 1}};
{error, Err} ->
rabbit_log:warning(
"~ts: failed to add member (replica) on node ~w, error: ~w",
[rabbit_misc:rs(QName), Node, Err]),
{QName, {error, Size, Err}}
end
maybe_grow(Q, Node, Membership, Size)
end
|| Q <- rabbit_amqqueue:list(),
amqqueue:get_type(Q) == ?MODULE,
Expand All @@ -1556,7 +1545,85 @@ grow(Node, VhostSpec, QueueSpec, Strategy, Membership) ->
lists:member(Node, Running),
matches_strategy(Strategy, get_nodes(Q)),
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ].
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ];

grow(QuorumClusterSize, VhostSpec, QueueSpec, Strategy, Membership)
when is_integer(QuorumClusterSize), QuorumClusterSize > 0 ->
Running = rabbit_nodes:list_running(),
TotalRunning = length(Running),

TargetQuorumClusterSize =
if QuorumClusterSize > TotalRunning ->
%% we cant grow beyond total running nodes
TotalRunning;
true ->
QuorumClusterSize
end,

lists:flatten(
[begin
QNodes = get_nodes(Q),
case length(QNodes) of
Size when Size < TargetQuorumClusterSize ->
TargetAvailableNodes = Running -- QNodes,
N = length(TargetAvailableNodes),
Node = lists:nth(rand:uniform(N), TargetAvailableNodes),
maybe_grow(Q, Node, Membership, Size);
_ ->
[]
end
end
|| _ <- lists:seq(1, TargetQuorumClusterSize),
Q <- rabbit_amqqueue:list(),
amqqueue:get_type(Q) == ?MODULE,
matches_strategy(Strategy, get_nodes(Q)),
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)]);

grow(QuorumClusterSize, _VhostSpec, _QueueSpec, _Strategy, _Membership)
when is_integer(QuorumClusterSize) ->
rabbit_log:warning(
"cannot grow queues to a quorum cluster size less than zero (~tp)",
[QuorumClusterSize]),
{error, bad_quorum_cluster_size}.

maybe_grow(Q, Node, Membership, Size) ->
QNodes = get_nodes(Q),
maybe_grow(Q, Node, Membership, Size, QNodes).

maybe_grow(Q, Node, Membership, Size, QNodes) ->
QName = amqqueue:get_name(Q),
{ok, RaName} = qname_to_internal_name(QName),
case check_all_memberships(RaName, QNodes, voter) of
true ->
rabbit_log:info("~ts: adding a new member (replica) on node ~w",
[rabbit_misc:rs(QName), Node]),
case add_member(Q, Node, Membership) of
ok ->
{QName, {ok, Size + 1}};
{error, Err} ->
rabbit_log:warning(
"~ts: failed to add member (replica) on node ~w, error: ~w",
[rabbit_misc:rs(QName), Node, Err]),
{QName, {error, Size, Err}}
end;
false ->
Err = {error, non_voters_found},
rabbit_log:warning(
"~ts: failed to add member (replica) on node ~w, error: ~w",
[rabbit_misc:rs(QName), Node, Err]),
{QName, {error, Size, Err}}
end.

check_all_memberships(RaName, QNodes, CompareMembership) ->
case rpc:multicall(QNodes, ets, lookup, [ra_state, RaName]) of
{Result, []} ->
lists:all(
fun(M) -> M == CompareMembership end,
[Membership || [{_RaName, _RaState, Membership}] <- Result]);
_ ->
false
end.

-spec transfer_leadership(amqqueue:amqqueue(), node()) -> {migrated, node()} | {not_migrated, atom()}.
transfer_leadership(Q, Destination) ->
Expand Down
113 changes: 112 additions & 1 deletion deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ groups() ->
node_removal_is_not_quorum_critical,
select_nodes_with_least_replicas,
select_nodes_with_least_replicas_node_down,
subscribe_from_each
subscribe_from_each,
grow_queue


]},
Expand Down Expand Up @@ -1536,6 +1537,116 @@ subscribe_from_each(Config) ->

ok.

grow_queue(Config) ->
[Server0, Server1, Server2, _Server3, _Server4] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
QQ = ?config(queue_name, Config),
AQ = ?config(alt_queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-quorum-initial-group-size">>, long, 5}])),
?assertEqual({'queue.declare_ok', AQ, 0, 0},
declare(Ch, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-quorum-initial-group-size">>, long, 5}])),

QQs = [QQ, AQ],
MsgCount = 3,

[begin
RaName = ra_name(Q),
rabbit_ct_client_helpers:publish(Ch, Q, MsgCount),
wait_for_messages_ready([Server0], RaName, MsgCount),
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]),
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
?assertEqual(5, length(Nodes0))
end || Q <- QQs],

rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_all_queues_shrink_member_to_current_member, []),

TargetClusterSize_1 = 1,
assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount),

%% grow queues to node 'Server1'
TargetClusterSize_2 = 2,
Result1 = rpc:call(Server0, rabbit_quorum_queue, grow, [Server1, <<"/">>, <<".*">>, all]),
%% [{{resource,<<"/">>,queue,<<"grow_queue">>},{ok,2}},
%% {{resource,<<"/">>,queue,<<"grow_queue_alt">>},{ok,2}},...]
?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result1)),
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount),

%% grow queues to quorum cluster size '2' has no effect
Result2 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_2, <<"/">>, <<".*">>, all]),
?assertEqual([], Result2),
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount),

%% grow queues to quorum cluster size '3'
TargetClusterSize_3 = 3,
Result3 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_3, <<"/">>, <<".*">>, all, voter]),
?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result3)),
assert_grown_queues(QQs, Server0, TargetClusterSize_3, MsgCount),

%% grow queues to quorum cluster size '5'
TargetClusterSize_5 = 5,
Result4 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_5, <<"/">>, <<".*">>, all, voter]),
?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result4)),
assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount),

%% shrink all queues again down to 1 member
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_all_queues_shrink_member_to_current_member, []),
assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount),

%% grow queues to quorum cluster size > '5' (limit = 5).
TargetClusterSize_10 = 10,
Result5 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_10, <<"/">>, <<".*">>, all]),
?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result5)),
assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount),

%% shrink all queues again down to 1 member
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_all_queues_shrink_member_to_current_member, []),
assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount),

%% attempt to grow queues to quorum cluster size < '0'.
BadTargetClusterSize = -5,
?assertEqual({error, bad_quorum_cluster_size},
rpc:call(Server0, rabbit_quorum_queue, grow, [BadTargetClusterSize, <<"/">>, <<".*">>, all])),

%% shrink all queues again down to 1 member
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_all_queues_shrink_member_to_current_member, []),
assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount),

%% grow queues to node 'Server1': non_voter
rpc:call(Server0, rabbit_quorum_queue, grow, [Server1, <<"/">>, <<".*">>, all, non_voter]),
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount),

%% grow queues to node 'Server2': fail, non_voters found
Result6 = rpc:call(Server0, rabbit_quorum_queue, grow, [Server2, <<"/">>, <<".*">>, all, voter]),
%% [{{resource,<<"/">>,queue,<<"grow_queue">>},{error, 2, {error, non_voters_found}},
%% {{resource,<<"/">>,queue,<<"grow_queue_alt">>},{error, 2, {error, non_voters_found}},...]
?assert(lists:all(
fun({_, Err}) -> Err =:= {error, TargetClusterSize_2, {error, non_voters_found}} end, Result6)),
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount),

%% grow queues to target quorum cluster size '5': fail, non_voters found
Result7 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_5, <<"/">>, <<".*">>, all]),
?assert(lists:all(
fun({_, Err}) -> Err =:= {error, TargetClusterSize_2, {error, non_voters_found}} end, Result7)),
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount).

assert_grown_queues(Qs, Node, TargetClusterSize, MsgCount) ->
[begin
RaName = ra_name(Q),
wait_for_messages_ready([Node], RaName, MsgCount),
{ok, Q0} = rpc:call(Node, rabbit_amqqueue, lookup, [Q, <<"/">>]),
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
?assertEqual(TargetClusterSize, length(Nodes0))
end || Q <- Qs].

gh_12635(Config) ->
% https://github.com/rabbitmq/rabbitmq-server/issues/12635
[Server0, _Server1, Server2] =
Expand Down
64 changes: 54 additions & 10 deletions deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,41 +33,79 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do
{:validation_failure, :too_many_args}
end

def validate([_, s], _)
def validate(args = [n, s], opts) do
case Integer.parse(n) do
{cluster_size, _} when is_integer(cluster_size) ->
do_validate([cluster_size, s], opts)

:error ->
do_validate(args, opts)
end
end

def do_validate([_, s], _)
when not (s == "all" or
s == "even") do
{:validation_failure, "strategy '#{s}' is not recognised."}
end

def validate(_, %{membership: m})
def do_validate([n, _], _)
when (is_integer(n) and n <= 0) do
{:validation_failure, "target quorum cluster size '#{n}' must be greater than 0."}
end

def do_validate([n, _], %{membership: m})
when (is_integer(n) and not (m == "voter" or m == "promotable")) do
{:validation_failure, "voter status '#{m}' must be 'voter' or 'promotable' to grow to target quorum cluster size '#{n}'."}
end

def do_validate(_, %{membership: m})
when not (m == "promotable" or
m == "non_voter" or
m == "voter") do
{:validation_failure, "voter status '#{m}' is not recognised."}
end

def validate(_, _) do
def do_validate(_, _) do
:ok
end

def validate_execution_environment(args, opts) do
Validators.chain(
[
&Validators.rabbit_is_running/2,
&Validators.existing_cluster_member/2
fn args = [n, _], opts ->
case Integer.parse(n) do
{cluster_size, _} when is_integer(cluster_size) ->
:ok

:error ->
Validators.existing_cluster_member(args, opts)
end
end
],
[args, opts]
)
end

def run([node, strategy], %{
def run([node_or_quorum_cluster_size, strategy], %{
node: node_name,
vhost_pattern: vhost_pat,
queue_pattern: queue_pat,
membership: membership,
errors_only: errors_only
}) do
args = [to_atom(node), vhost_pat, queue_pat, to_atom(strategy)]

node_or_quorum_cluster_size =
case Integer.parse(node_or_quorum_cluster_size) do
{cluster_size, _} when is_integer(cluster_size) ->
cluster_size

:error ->
to_atom(node_or_quorum_cluster_size)
end

args = [node_or_quorum_cluster_size, vhost_pat, queue_pat, to_atom(strategy)]

args =
case to_atom(membership) do
Expand Down Expand Up @@ -108,11 +146,11 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do

def usage,
do:
"grow <node> <all | even> [--vhost-pattern <pattern>] [--queue-pattern <pattern>] [--membership <promotable|voter>]"
"grow <node | quorum_cluster_size> <all | even> [--vhost-pattern <pattern>] [--queue-pattern <pattern>] [--membership <promotable|voter>]"

def usage_additional do
[
["<node>", "node name to place replicas on"],
["<node | quorum_cluster_size>", "node name to place replicas on or desired quorum cluster size"],
[
"<all | even>",
"add a member for all matching queues or just those whose membership count is an even number"
Expand All @@ -136,8 +174,14 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do
do:
"Grows quorum queue clusters by adding a member (replica) on the specified node for all matching queues"

def banner([node, strategy], _) do
"Growing #{strategy} quorum queues on #{node}..."
def banner([node_or_quorum_cluster_size, strategy], %{queue_pattern: queue_pattern}) do
case Integer.parse(node_or_quorum_cluster_size) do
{cluster_size, _} when is_integer(cluster_size) ->
"Growing #{strategy} quorum queues matching '#{queue_pattern}' to a target cluster size of '#{cluster_size}'..."

:error ->
"Growing #{strategy} quorum queues matching '#{queue_pattern}' to #{node_or_quorum_cluster_size}..."
end
end

#
Expand Down
Loading
Loading