Skip to content

Commit 425a9d6

Browse files
Merge pull request #14401 from rabbitmq/qq-drain-chunks
QQ: when invoking drain only shut down small batches at a time
2 parents c7f6cad + 42b3caa commit 425a9d6

File tree

3 files changed

+77
-33
lines changed

3 files changed

+77
-33
lines changed

deps/rabbit/src/rabbit_maintenance.erl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,15 @@ drain() ->
7676
}),
7777

7878
TransferCandidates = primary_replica_transfer_candidate_nodes(),
79+
80+
%% Transfer metadata store before queues as each queue needs to perform
81+
%% a metadata update after an election
82+
transfer_leadership_of_metadata_store(TransferCandidates),
83+
7984
%% Note: only QQ leadership is transferred because it is a reasonably quick thing to do a lot of queues
8085
%% in the cluster, unlike with CMQs.
8186
rabbit_queue_type:drain(TransferCandidates),
8287

83-
transfer_leadership_of_metadata_store(TransferCandidates),
84-
8588
%% allow plugins to react
8689
rabbit_event:notify(maintenance_draining, #{
8790
reason => <<"node is being put into maintenance">>

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 68 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@
169169
-define(MIN_CHECKPOINT_INTERVAL, 64).
170170
-define(LEADER_HEALTH_CHECK_TIMEOUT, 5_000).
171171
-define(GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT, 60_000).
172+
-define(RA_MEMBERS_TIMEOUT, 30_000).
172173

173174
%%----------- QQ policies ---------------------------------------------------
174175

@@ -1229,7 +1230,6 @@ policy_changed(Q) ->
12291230
end.
12301231

12311232
-spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'.
1232-
12331233
cluster_state(Name) ->
12341234
case whereis(Name) of
12351235
undefined -> down;
@@ -1577,17 +1577,18 @@ grow(Node, VhostSpec, QueueSpec, Strategy, Membership) ->
15771577
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
15781578
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ].
15791579

1580-
-spec transfer_leadership(amqqueue:amqqueue(), node()) -> {migrated, node()} | {not_migrated, atom()}.
1580+
-spec transfer_leadership(amqqueue:amqqueue(), node()) ->
1581+
{migrated, node()} | {not_migrated, atom()}.
15811582
transfer_leadership(Q, Destination) ->
1582-
{RaName, _} = Pid = amqqueue:get_pid(Q),
1583-
case ra:transfer_leadership(Pid, {RaName, Destination}) of
1583+
{RaName, _} = Leader = amqqueue:get_pid(Q),
1584+
case ra:transfer_leadership(Leader, {RaName, Destination}) of
15841585
ok ->
1585-
case ra:members(Pid) of
1586-
{_, _, {_, NewNode}} ->
1587-
{migrated, NewNode};
1588-
{timeout, _} ->
1589-
{not_migrated, ra_members_timeout}
1590-
end;
1586+
case ra:members(Leader, ?RA_MEMBERS_TIMEOUT) of
1587+
{_, _, {_, NewNode}} ->
1588+
{migrated, NewNode};
1589+
{timeout, _} ->
1590+
{not_migrated, ra_members_timeout}
1591+
end;
15911592
already_leader ->
15921593
{not_migrated, already_leader};
15931594
{error, Reason} ->
@@ -1750,9 +1751,17 @@ i(memory, Q) when ?is_amqqueue(Q) ->
17501751
0
17511752
end;
17521753
i(state, Q) when ?is_amqqueue(Q) ->
1753-
{Name, Node} = amqqueue:get_pid(Q),
1754+
{Name, Node} = case find_leader(Q) of
1755+
undefined ->
1756+
%% fall back to queue record
1757+
amqqueue:get_pid(Q);
1758+
Leader ->
1759+
Leader
1760+
end,
17541761
%% Check against the leader or last known leader
17551762
case erpc_call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of
1763+
{error, {erpc, timeout}} ->
1764+
timeout;
17561765
{error, _} ->
17571766
down;
17581767
State ->
@@ -1912,7 +1921,12 @@ format(Q, Ctx) when ?is_amqqueue(Q) ->
19121921
rabbit_nodes:list_running()
19131922
end,
19141923
Online = [N || N <- Nodes, lists:member(N, Running)],
1915-
{_, LeaderNode} = amqqueue:get_pid(Q),
1924+
{_, LeaderNode} = case find_leader(Q) of
1925+
undefined ->
1926+
amqqueue:get_pid(Q);
1927+
Leader ->
1928+
Leader
1929+
end,
19161930
State = case is_minority(Nodes, Online) of
19171931
true when length(Online) == 0 ->
19181932
down;
@@ -2299,27 +2313,50 @@ drain(TransferCandidates) ->
22992313
transfer_leadership([]) ->
23002314
?LOG_WARNING("Skipping leadership transfer of quorum queues: no candidate "
23012315
"(online, not under maintenance) nodes to transfer to!");
2302-
transfer_leadership(_TransferCandidates) ->
2316+
transfer_leadership(_CandidateNodes) ->
23032317
%% we only transfer leadership for QQs that have local leaders
2304-
Queues = rabbit_amqqueue:list_local_leaders(),
2318+
LocalLeaderQueues = rabbit_amqqueue:list_local_leaders(),
2319+
QueuesChunked = ra_lib:lists_chunk(256, LocalLeaderQueues),
23052320
?LOG_INFO("Will transfer leadership of ~b quorum queues with current leader on this node",
2306-
[length(Queues)]),
2307-
_ = [begin
2308-
Name = amqqueue:get_name(Q),
2309-
?LOG_DEBUG("Will trigger a leader election for local quorum queue ~ts",
2310-
[rabbit_misc:rs(Name)]),
2311-
%% we trigger an election and exclude this node from the list of candidates
2312-
%% by simply shutting its local QQ replica (Ra server)
2313-
RaLeader = amqqueue:get_pid(Q),
2314-
?LOG_DEBUG("Will stop Ra server ~tp", [RaLeader]),
2315-
case rabbit_quorum_queue:stop_server(RaLeader) of
2316-
ok ->
2317-
?LOG_DEBUG("Successfully stopped Ra server ~tp", [RaLeader]);
2318-
{error, nodedown} ->
2319-
?LOG_ERROR("Failed to stop Ra server ~tp: target node was reported as down")
2320-
end
2321-
end || Q <- Queues],
2322-
?LOG_INFO("Leadership transfer for quorum queues hosted on this node has been initiated").
2321+
[length(LocalLeaderQueues)]),
2322+
[begin
2323+
[begin
2324+
%% we trigger an election and exclude this node from the list of candidates
2325+
%% by simply shutting its local QQ replica (Ra server)
2326+
RaLeader = amqqueue:get_pid(Q),
2327+
?LOG_DEBUG("Will stop Ra leader ~tp", [RaLeader]),
2328+
case rabbit_quorum_queue:stop_server(RaLeader) of
2329+
ok ->
2330+
?LOG_DEBUG("Successfully stopped Ra server ~tp", [RaLeader]);
2331+
{error, nodedown} ->
2332+
?LOG_ERROR("Failed to stop Ra server ~tp: target node was reported as down")
2333+
end,
2334+
ok
2335+
end || Q <- Queues],
2336+
%% wait for leader elections before processing next chunk of queues
2337+
[begin
2338+
{RaName, LeaderNode} = amqqueue:get_pid(Q),
2339+
MemberNodes = lists:delete(LeaderNode, amqqueue:get_nodes(Q)),
2340+
%% we don't do any explicit error handling here as it is more
2341+
%% important to make progress
2342+
_ = lists:any(fun (N) ->
2343+
case ra:members({RaName, N}, ?RA_MEMBERS_TIMEOUT) of
2344+
{ok, _, _} ->
2345+
true;
2346+
Err ->
2347+
Name = amqqueue:get_name(Q),
2348+
?LOG_DEBUG("Failed to wait for leader election for queue ~ts on ~tp Err ~ts",
2349+
[Name, N, Err]),
2350+
false
2351+
end
2352+
end, MemberNodes),
2353+
ok
2354+
2355+
end || Q <- Queues],
2356+
ok
2357+
end || Queues <- QueuesChunked],
2358+
?LOG_INFO("Leadership transfer for quorum queues hosted on this node has been initiated"),
2359+
ok.
23232360

23242361
%% TODO: I just copied it over, it looks like was always called inside maintenance so...
23252362
-spec stop_local_quorum_queue_followers() -> ok.

deps/rabbitmq_management/priv/www/js/formatters.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,10 @@ function fmt_object_state(obj) {
601601
explanation = 'The queue does not have sufficient online members to ' +
602602
'make progress'
603603
}
604+
else if (obj.state == 'timeout') {
605+
colour = 'yellow';
606+
explanation = 'The queue leader did not respond to its status request ';
607+
}
604608

605609
return fmt_state(colour, text, explanation);
606610
}

0 commit comments

Comments
 (0)