Skip to content

Commit

Permalink
Add test that should fail
Browse files Browse the repository at this point in the history
Add code to block multiple queue rebalance operations, fix test

Allow acquiring the rebalance lock prior to calling rabbit_amqqueue:rebalance

Simplify queue rebalance code to always acquire the lock using the current process
  • Loading branch information
lukebakken committed Mar 6, 2020
1 parent 4406d8e commit 93179ac
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 6 deletions.
35 changes: 30 additions & 5 deletions src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -514,9 +514,29 @@ not_found_or_absent_dirty(Name) ->
{ok, Q} -> {absent, Q, nodedown}
end.

-spec get_rebalance_lock(pid()) ->
{true, {rebalance_queues, pid()}} | false.
get_rebalance_lock(Pid) when is_pid(Pid) ->
Id = {rebalance_queues, Pid},
Nodes = [node()|nodes()],
%% Note that we're not re-trying. We want to immediately know
%% if a re-balance is taking place and stop accordingly.
case global:set_lock(Id, Nodes, 0) of
true ->
{true, Id};
false ->
false
end.

-spec rebalance('all' | 'quorum' | 'classic', binary(), binary()) ->
{ok, [{node(), pos_integer()}]}.
{ok, [{node(), pos_integer()}]} | {error, term()}.
rebalance(Type, VhostSpec, QueueSpec) ->
%% We have not yet acquired the rebalance_queues global lock.
maybe_rebalance(get_rebalance_lock(self()), Type, VhostSpec, QueueSpec).

maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
rabbit_log:info("Starting queue rebalance operation: '~s' for vhosts matching '~s' and queues matching '~s'",
[Type, VhostSpec, QueueSpec]),
Running = rabbit_mnesia:cluster_nodes(running),
NumRunning = length(Running),
ToRebalance = [Q || Q <- rabbit_amqqueue:list(),
Expand All @@ -527,11 +547,16 @@ rebalance(Type, VhostSpec, QueueSpec) ->
NumToRebalance = length(ToRebalance),
ByNode = group_by_node(ToRebalance),
Rem = case (NumToRebalance rem NumRunning) of
0 -> 0;
_ -> 1
end,
0 -> 0;
_ -> 1
end,
MaxQueuesDesired = (NumToRebalance div NumRunning) + Rem,
iterative_rebalance(ByNode, MaxQueuesDesired).
Result = iterative_rebalance(ByNode, MaxQueuesDesired),
global:del_lock(Id),
Result;
maybe_rebalance(false, _Type, _VhostSpec, _QueueSpec) ->
rabbit_log:warning("Queue rebalance operation is in progress, please wait."),
{error, rebalance_in_progress}.

filter_per_type(all, _) ->
true;
Expand Down
25 changes: 24 additions & 1 deletion test/dynamic_ha_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ groups() ->
queue_survive_adding_dead_vhost_mirror,
rebalance_all,
rebalance_exactly,
rebalance_nodes
rebalance_nodes,
rebalance_multiple_blocked
% FIXME: Re-enable those tests when the know issues are
% fixed.
% failing_random_policies,
Expand Down Expand Up @@ -691,6 +692,28 @@ rebalance_nodes(Config) ->

ok.

rebalance_multiple_blocked(Config) ->
[A, _, _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
Q1 = <<"q1">>,
Q2 = <<"q2">>,
Q3 = <<"q3">>,
Q4 = <<"q4">>,
Q5 = <<"q5">>,
amqp_channel:call(ACh, #'queue.declare'{queue = Q1}),
amqp_channel:call(ACh, #'queue.declare'{queue = Q2}),
amqp_channel:call(ACh, #'queue.declare'{queue = Q3}),
amqp_channel:call(ACh, #'queue.declare'{queue = Q4}),
amqp_channel:call(ACh, #'queue.declare'{queue = Q5}),
?assertEqual(A, node(proplists:get_value(pid, find_queue(Q1, A)))),
?assertEqual(A, node(proplists:get_value(pid, find_queue(Q2, A)))),
?assertEqual(A, node(proplists:get_value(pid, find_queue(Q3, A)))),
?assertEqual(A, node(proplists:get_value(pid, find_queue(Q4, A)))),
?assertEqual(A, node(proplists:get_value(pid, find_queue(Q5, A)))),
true = rpc:cast(A, rabbit_amqqueue, rebalance, [classic, ".*", ".*"]),
{error, rebalance_in_progress} = rpc:call(A, rabbit_amqqueue, rebalance, [classic, ".*", ".*"]),
ok.

%%----------------------------------------------------------------------------

assert_slaves(RPCNode, QName, Exp) ->
Expand Down

0 comments on commit 93179ac

Please sign in to comment.