Skip to content

Commit a5ecc86

Browse files
Merge pull request #2751 from rabbitmq/rabbitmq-server-2749
Start mirrors synchronously when transferring CMQ leadership (cherry picked from commit 50761cb)
1 parent 4e38dcb commit a5ecc86

File tree

3 files changed

+96
-50
lines changed

3 files changed

+96
-50
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,7 @@ rebalance(Type, VhostSpec, QueueSpec) ->
529529
maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
530530
rabbit_log:info("Starting queue rebalance operation: '~s' for vhosts matching '~s' and queues matching '~s'",
531531
[Type, VhostSpec, QueueSpec]),
532-
Running = rabbit_nodes:all_running(),
532+
Running = rabbit_maintenance:filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running()),
533533
NumRunning = length(Running),
534534
ToRebalance = [Q || Q <- rabbit_amqqueue:list(),
535535
filter_per_type(Type, Q),
@@ -596,12 +596,12 @@ maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) ->
596596
[{_, Q, false} = Queue | Queues] = All when length(All) > MaxQueuesDesired ->
597597
Name = amqqueue:get_name(Q),
598598
Module = rebalance_module(Q),
599-
OtherNodes = Module:get_replicas(Q) -- [N],
600-
case OtherNodes of
599+
Candidates = Module:get_replicas(Q) -- [N],
600+
case Candidates of
601601
[] ->
602602
{not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)};
603603
_ ->
604-
[{Length, Destination} | _] = sort_by_number_of_queues(OtherNodes, ByNode),
604+
[{Length, Destination} | _] = sort_by_number_of_queues(Candidates, ByNode),
605605
rabbit_log:warning("Migrating queue ~p from node ~p with ~p queues to node ~p with ~p queues",
606606
[Name, N, length(All), Destination, Length]),
607607
case Module:transfer_leadership(Q, Destination) of

deps/rabbit/src/rabbit_maintenance.erl

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
resume_all_client_listeners/0,
2626
close_all_client_connections/0,
2727
primary_replica_transfer_candidate_nodes/0,
28-
random_primary_replica_transfer_candidate_node/1,
28+
random_primary_replica_transfer_candidate_node/2,
2929
transfer_leadership_of_quorum_queues/1,
3030
transfer_leadership_of_classic_mirrored_queues/1,
3131
status_table_name/0,
@@ -256,14 +256,16 @@ transfer_leadership_of_classic_mirrored_queues(TransferCandidates) ->
256256
ReadableCandidates = readable_candidate_list(TransferCandidates),
257257
rabbit_log:info("Will transfer leadership of ~b classic mirrored queues hosted on this node to these peer nodes: ~s",
258258
[length(Queues), ReadableCandidates]),
259-
260259
[begin
261260
Name = amqqueue:get_name(Q),
262-
case random_primary_replica_transfer_candidate_node(TransferCandidates) of
261+
ExistingReplicaNodes = [node(Pid) || Pid <- amqqueue:get_sync_slave_pids(Q)],
262+
rabbit_log:debug("Local ~s has replicas on nodes ~s",
263+
[rabbit_misc:rs(Name), readable_candidate_list(ExistingReplicaNodes)]),
264+
case random_primary_replica_transfer_candidate_node(TransferCandidates, ExistingReplicaNodes) of
263265
{ok, Pick} ->
264-
rabbit_log:debug("Will transfer leadership of local queue ~s to node ~s",
266+
rabbit_log:debug("Will transfer leadership of local ~s to node ~s",
265267
[rabbit_misc:rs(Name), Pick]),
266-
case rabbit_mirror_queue_misc:transfer_leadership(Q, Pick) of
268+
case rabbit_mirror_queue_misc:migrate_leadership_to_existing_replica(Q, Pick) of
267269
{migrated, _} ->
268270
rabbit_log:debug("Successfully transferred leadership of queue ~s to node ~s",
269271
[rabbit_misc:rs(Name), Pick]);
@@ -300,18 +302,30 @@ stop_local_quorum_queue_followers() ->
300302
end || Q <- Queues],
301303
rabbit_log:info("Stopped all local replicas of quorum queues hosted on this node").
302304

303-
-spec primary_replica_transfer_candidate_nodes() -> [node()].
305+
-spec primary_replica_transfer_candidate_nodes() -> [node()].
304306
primary_replica_transfer_candidate_nodes() ->
305307
filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running() -- [node()]).
306308

307-
-spec random_primary_replica_transfer_candidate_node([node()]) -> {ok, node()} | undefined.
308-
random_primary_replica_transfer_candidate_node([]) ->
309+
-spec random_primary_replica_transfer_candidate_node([node()], [node()]) -> {ok, node()} | undefined.
310+
random_primary_replica_transfer_candidate_node([], _Preferred) ->
309311
undefined;
310-
random_primary_replica_transfer_candidate_node(Candidates) ->
311-
Nth = erlang:phash2(erlang:monotonic_time(), length(Candidates)),
312-
Candidate = lists:nth(Nth + 1, Candidates),
312+
random_primary_replica_transfer_candidate_node(Candidates, PreferredNodes) ->
313+
Overlap = sets:to_list(sets:intersection(sets:from_list(Candidates), sets:from_list(PreferredNodes))),
314+
Candidate = case Overlap of
315+
[] ->
316+
%% Since ownership transfer is meant to be run only when we are sure
317+
%% there are in-sync replicas to transfer to, this is an edge case.
318+
%% We skip the transfer.
319+
undefined;
320+
Nodes ->
321+
random_nth(Nodes)
322+
end,
313323
{ok, Candidate}.
314324

325+
random_nth(Nodes) ->
326+
Nth = erlang:phash2(erlang:monotonic_time(), length(Nodes)),
327+
lists:nth(Nth + 1, Nodes).
328+
315329
revive_local_quorum_queue_replicas() ->
316330
Queues = rabbit_amqqueue:list_local_followers(),
317331
[begin

deps/rabbit/src/rabbit_mirror_queue_misc.erl

Lines changed: 67 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
sync_batch_size/1, log_info/3, log_warning/3]).
1818
-export([stop_all_slaves/5]).
1919

20-
-export([sync_queue/1, cancel_sync_queue/1]).
20+
-export([sync_queue/1, cancel_sync_queue/1, queue_length/1]).
2121

22-
-export([transfer_leadership/2, queue_length/1, get_replicas/1]).
22+
-export([get_replicas/1, transfer_leadership/2, migrate_leadership_to_existing_replica/2]).
2323

2424
%% for testing only
2525
-export([module/1]).
@@ -201,12 +201,13 @@ drop_mirror(QName, MirrorNode) ->
201201
case rabbit_amqqueue:lookup(QName) of
202202
{ok, Q} when ?is_amqqueue(Q) ->
203203
Name = amqqueue:get_name(Q),
204-
QPid = amqqueue:get_pid(Q),
205-
SPids = amqqueue:get_slave_pids(Q),
206-
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
204+
PrimaryPid = amqqueue:get_pid(Q),
205+
MirrorPids = amqqueue:get_slave_pids(Q),
206+
AllReplicaPids = [PrimaryPid | MirrorPids],
207+
case [Pid || Pid <- AllReplicaPids, node(Pid) =:= MirrorNode] of
207208
[] ->
208209
{error, {queue_not_mirrored_on_node, MirrorNode}};
209-
[QPid] when SPids =:= [] ->
210+
[PrimaryPid] when MirrorPids =:= [] ->
210211
{error, cannot_drop_only_mirror};
211212
[Pid] ->
212213
log_info(Name, "Dropping queue mirror on node ~p~n",
@@ -235,11 +236,9 @@ add_mirror(QName, MirrorNode, SyncMode) ->
235236
case rabbit_vhost_sup_sup:get_vhost_sup(VHost, MirrorNode) of
236237
{ok, _} ->
237238
try
238-
SPid = rabbit_amqqueue_sup_sup:start_queue_process(
239-
MirrorNode, Q, slave),
240-
log_info(QName, "Adding mirror on node ~p: ~p~n",
241-
[MirrorNode, SPid]),
242-
rabbit_mirror_queue_slave:go(SPid, SyncMode)
239+
MirrorPid = rabbit_amqqueue_sup_sup:start_queue_process(MirrorNode, Q, slave),
240+
log_info(QName, "Adding mirror on node ~p: ~p~n", [MirrorNode, MirrorPid]),
241+
rabbit_mirror_queue_slave:go(MirrorPid, SyncMode)
243242
of
244243
_ -> ok
245244
catch
@@ -264,10 +263,10 @@ add_mirror(QName, MirrorNode, SyncMode) ->
264263
report_deaths(_MirrorPid, _IsMaster, _QueueName, []) ->
265264
ok;
266265
report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) ->
267-
log_info(QueueName, "~s ~s saw deaths of mirrors~s~n",
266+
log_info(QueueName, "~s replica of queue ~s detected replica ~s to be down~n",
268267
[case IsMaster of
269-
true -> "Master";
270-
false -> "Slave"
268+
true -> "Primary";
269+
false -> "Secondary"
271270
end,
272271
rabbit_misc:pid_to_string(MirrorPid),
273272
[[$ , rabbit_misc:pid_to_string(P)] || P <- DeadPids]]).
@@ -447,14 +446,15 @@ is_mirrored_ha_nodes(Q) ->
447446
end.
448447

449448
actual_queue_nodes(Q) when ?is_amqqueue(Q) ->
450-
MPid = amqqueue:get_pid(Q),
451-
SPids = amqqueue:get_slave_pids(Q),
452-
SSPids = amqqueue:get_sync_slave_pids(Q),
453-
Nodes = fun (L) -> [node(Pid) || Pid <- L] end,
454-
{case MPid of
455-
none -> none;
456-
_ -> node(MPid)
457-
end, Nodes(SPids), Nodes(SSPids)}.
449+
PrimaryPid = amqqueue:get_pid(Q),
450+
MirrorPids = amqqueue:get_slave_pids(Q),
451+
InSyncMirrorPids = amqqueue:get_sync_slave_pids(Q),
452+
CollectNodes = fun (L) -> [node(Pid) || Pid <- L] end,
453+
NodeHostingPrimary = case PrimaryPid of
454+
none -> none;
455+
_ -> node(PrimaryPid)
456+
end,
457+
{NodeHostingPrimary, CollectNodes(MirrorPids), CollectNodes(InSyncMirrorPids)}.
458458

459459
-spec maybe_auto_sync(amqqueue:amqqueue()) -> 'ok'.
460460

@@ -520,19 +520,19 @@ update_mirrors(OldQ, NewQ) when ?amqqueue_pids_are_equal(OldQ, NewQ) ->
520520

521521
update_mirrors(Q) when ?is_amqqueue(Q) ->
522522
QName = amqqueue:get_name(Q),
523-
{OldMNode, OldSNodes, _} = actual_queue_nodes(Q),
524-
{NewMNode, NewSNodes} = suggested_queue_nodes(Q),
525-
OldNodes = [OldMNode | OldSNodes],
526-
NewNodes = [NewMNode | NewSNodes],
523+
{PreTransferPrimaryNode, PreTransferMirrorNodes, __PreTransferInSyncMirrorNodes} = actual_queue_nodes(Q),
524+
{NewlySelectedPrimaryNode, NewlySelectedMirrorNodes} = suggested_queue_nodes(Q),
525+
PreTransferNodesWithReplicas = [PreTransferPrimaryNode | PreTransferMirrorNodes],
526+
NewlySelectedNodesWithReplicas = [NewlySelectedPrimaryNode | NewlySelectedMirrorNodes],
527527
%% When a mirror dies, remove_from_queue/2 might have to add new
528-
%% mirrors (in "exactly" mode). It will check mnesia to see which
528+
%% mirrors (in "exactly" mode). It will check the queue record to see which
529529
%% mirrors there currently are. If drop_mirror/2 is invoked first
530530
%% then when we end up in remove_from_queue/2 it will not see the
531531
%% mirrors that add_mirror/2 will add, and also want to add them
532532
%% (even though we are not responding to the death of a
533533
%% mirror). Breakage ensues.
534-
add_mirrors (QName, NewNodes -- OldNodes, async),
535-
drop_mirrors(QName, OldNodes -- NewNodes),
534+
add_mirrors(QName, NewlySelectedNodesWithReplicas -- PreTransferNodesWithReplicas, async),
535+
drop_mirrors(QName, PreTransferNodesWithReplicas -- NewlySelectedNodesWithReplicas),
536536
%% This is for the case where no extra nodes were added but we changed to
537537
%% a policy requiring auto-sync.
538538
maybe_auto_sync(Q),
@@ -543,15 +543,47 @@ queue_length(Q) ->
543543
M.
544544

545545
get_replicas(Q) ->
546-
{MNode, SNodes} = suggested_queue_nodes(Q),
547-
[MNode] ++ SNodes.
546+
{PrimaryNode, MirrorNodes} = suggested_queue_nodes(Q),
547+
[PrimaryNode] ++ MirrorNodes.
548548

549+
%% Moves the primary replica (leader) of a classic mirrored queue to another node.
550+
%% Target node can be any node in the cluster, and does not have to host a replica
551+
%% of this queue.
549552
transfer_leadership(Q, Destination) ->
550553
QName = amqqueue:get_name(Q),
551-
{OldMNode, OldSNodes, _} = actual_queue_nodes(Q),
552-
OldNodes = [OldMNode | OldSNodes],
553-
add_mirrors(QName, [Destination] -- OldNodes, async),
554-
drop_mirrors(QName, OldNodes -- [Destination]),
554+
{PreTransferPrimaryNode, PreTransferMirrorNodes, _PreTransferInSyncMirrorNodes} = actual_queue_nodes(Q),
555+
PreTransferNodesWithReplicas = [PreTransferPrimaryNode | PreTransferMirrorNodes],
556+
557+
NodesToAddMirrorsOn = [Destination] -- PreTransferNodesWithReplicas,
558+
%% This will wait for the transfer/eager sync to finish before we begin dropping
559+
%% mirrors on the next step. In this case we cannot add mirrors asynchronously
560+
%% as that will race with the dropping step.
561+
add_mirrors(QName, NodesToAddMirrorsOn, sync),
562+
563+
NodesToDropMirrorsOn = PreTransferNodesWithReplicas -- [Destination],
564+
drop_mirrors(QName, NodesToDropMirrorsOn),
565+
566+
{Result, NewQ} = wait_for_new_master(QName, Destination),
567+
update_mirrors(NewQ),
568+
Result.
569+
570+
%% Moves the primary replica (leader) of a classic mirrored queue to another node
571+
%% which already hosts a replica of this queue. In this case we can stop
572+
%% fewer replicas and reduce the load the operation has on the cluster.
573+
migrate_leadership_to_existing_replica(Q, Destination) ->
574+
QName = amqqueue:get_name(Q),
575+
{PreTransferPrimaryNode, PreTransferMirrorNodes, _PreTransferInSyncMirrorNodes} = actual_queue_nodes(Q),
576+
PreTransferNodesWithReplicas = [PreTransferPrimaryNode | PreTransferMirrorNodes],
577+
578+
NodesToAddMirrorsOn = [Destination] -- PreTransferNodesWithReplicas,
579+
%% This will wait for the transfer/eager sync to finish before we begin dropping
580+
%% mirrors on the next step. In this case we cannot add mirrors asynchronously
581+
%% as that will race with the dropping step.
582+
add_mirrors(QName, NodesToAddMirrorsOn, sync),
583+
584+
NodesToDropMirrorsOn = [PreTransferPrimaryNode],
585+
drop_mirrors(QName, NodesToDropMirrorsOn),
586+
555587
{Result, NewQ} = wait_for_new_master(QName, Destination),
556588
update_mirrors(NewQ),
557589
Result.

0 commit comments

Comments
 (0)