Skip to content

Start mirrors synchronously when transferring CMQ leadership #2751

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ rebalance(Type, VhostSpec, QueueSpec) ->
maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
rabbit_log:info("Starting queue rebalance operation: '~s' for vhosts matching '~s' and queues matching '~s'",
[Type, VhostSpec, QueueSpec]),
Running = rabbit_nodes:all_running(),
Running = rabbit_maintenance:filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running()),
NumRunning = length(Running),
ToRebalance = [Q || Q <- rabbit_amqqueue:list(),
filter_per_type(Type, Q),
Expand Down Expand Up @@ -505,12 +505,12 @@ maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) ->
[{_, Q, false} = Queue | Queues] = All when length(All) > MaxQueuesDesired ->
Name = amqqueue:get_name(Q),
Module = rebalance_module(Q),
OtherNodes = Module:get_replicas(Q) -- [N],
case OtherNodes of
Candidates = Module:get_replicas(Q) -- [N],
case Candidates of
[] ->
{not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)};
_ ->
[{Length, Destination} | _] = sort_by_number_of_queues(OtherNodes, ByNode),
[{Length, Destination} | _] = sort_by_number_of_queues(Candidates, ByNode),
rabbit_log:warning("Migrating queue ~p from node ~p with ~p queues to node ~p with ~p queues",
[Name, N, length(All), Destination, Length]),
case Module:transfer_leadership(Q, Destination) of
Expand Down
36 changes: 25 additions & 11 deletions deps/rabbit/src/rabbit_maintenance.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
resume_all_client_listeners/0,
close_all_client_connections/0,
primary_replica_transfer_candidate_nodes/0,
random_primary_replica_transfer_candidate_node/1,
random_primary_replica_transfer_candidate_node/2,
transfer_leadership_of_quorum_queues/1,
transfer_leadership_of_classic_mirrored_queues/1,
status_table_name/0,
Expand Down Expand Up @@ -256,14 +256,16 @@ transfer_leadership_of_classic_mirrored_queues(TransferCandidates) ->
ReadableCandidates = readable_candidate_list(TransferCandidates),
rabbit_log:info("Will transfer leadership of ~b classic mirrored queues hosted on this node to these peer nodes: ~s",
[length(Queues), ReadableCandidates]),

[begin
Name = amqqueue:get_name(Q),
case random_primary_replica_transfer_candidate_node(TransferCandidates) of
ExistingReplicaNodes = [node(Pid) || Pid <- amqqueue:get_sync_slave_pids(Q)],
rabbit_log:debug("Local ~s has replicas on nodes ~s",
[rabbit_misc:rs(Name), readable_candidate_list(ExistingReplicaNodes)]),
case random_primary_replica_transfer_candidate_node(TransferCandidates, ExistingReplicaNodes) of
{ok, Pick} ->
rabbit_log:debug("Will transfer leadership of local queue ~s to node ~s",
rabbit_log:debug("Will transfer leadership of local ~s to node ~s",
[rabbit_misc:rs(Name), Pick]),
case rabbit_mirror_queue_misc:transfer_leadership(Q, Pick) of
case rabbit_mirror_queue_misc:migrate_leadership_to_existing_replica(Q, Pick) of
{migrated, _} ->
rabbit_log:debug("Successfully transferred leadership of queue ~s to node ~s",
[rabbit_misc:rs(Name), Pick]);
Expand Down Expand Up @@ -300,18 +302,30 @@ stop_local_quorum_queue_followers() ->
end || Q <- Queues],
rabbit_log:info("Stopped all local replicas of quorum queues hosted on this node").

-spec primary_replica_transfer_candidate_nodes() -> [node()].
-spec primary_replica_transfer_candidate_nodes() -> [node()].
primary_replica_transfer_candidate_nodes() ->
filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running() -- [node()]).

-spec random_primary_replica_transfer_candidate_node([node()]) -> {ok, node()} | undefined.
random_primary_replica_transfer_candidate_node([]) ->
-spec random_primary_replica_transfer_candidate_node([node()], [node()]) -> {ok, node()} | undefined.
random_primary_replica_transfer_candidate_node([], _Preferred) ->
undefined;
random_primary_replica_transfer_candidate_node(Candidates) ->
Nth = erlang:phash2(erlang:monotonic_time(), length(Candidates)),
Candidate = lists:nth(Nth + 1, Candidates),
random_primary_replica_transfer_candidate_node(Candidates, PreferredNodes) ->
Overlap = sets:to_list(sets:intersection(sets:from_list(Candidates), sets:from_list(PreferredNodes))),
Candidate = case Overlap of
[] ->
%% Since ownership transfer is meant to be run only when we are sure
%% there are in-sync replicas to transfer to, this is an edge case.
%% We skip the transfer.
undefined;
Nodes ->
random_nth(Nodes)
end,
{ok, Candidate}.

random_nth(Nodes) ->
Nth = erlang:phash2(erlang:monotonic_time(), length(Nodes)),
lists:nth(Nth + 1, Nodes).

revive_local_quorum_queue_replicas() ->
Queues = rabbit_amqqueue:list_local_followers(),
[begin
Expand Down
102 changes: 67 additions & 35 deletions deps/rabbit/src/rabbit_mirror_queue_misc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
sync_batch_size/1, log_info/3, log_warning/3]).
-export([stop_all_slaves/5]).

-export([sync_queue/1, cancel_sync_queue/1]).
-export([sync_queue/1, cancel_sync_queue/1, queue_length/1]).

-export([transfer_leadership/2, queue_length/1, get_replicas/1]).
-export([get_replicas/1, transfer_leadership/2, migrate_leadership_to_existing_replica/2]).

%% for testing only
-export([module/1]).
Expand Down Expand Up @@ -201,12 +201,13 @@ drop_mirror(QName, MirrorNode) ->
case rabbit_amqqueue:lookup(QName) of
{ok, Q} when ?is_amqqueue(Q) ->
Name = amqqueue:get_name(Q),
QPid = amqqueue:get_pid(Q),
SPids = amqqueue:get_slave_pids(Q),
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
PrimaryPid = amqqueue:get_pid(Q),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

John O'Hara - inventor of AMQP - will be sad to see the demise of QPid - AMQP's first implementation - from the RabbitMQ codebase

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hahaha. Definitely a good one 🤓

MirrorPids = amqqueue:get_slave_pids(Q),
AllReplicaPids = [PrimaryPid | MirrorPids],
case [Pid || Pid <- AllReplicaPids, node(Pid) =:= MirrorNode] of
[] ->
{error, {queue_not_mirrored_on_node, MirrorNode}};
[QPid] when SPids =:= [] ->
[PrimaryPid] when MirrorPids =:= [] ->
{error, cannot_drop_only_mirror};
[Pid] ->
log_info(Name, "Dropping queue mirror on node ~p~n",
Expand Down Expand Up @@ -235,11 +236,9 @@ add_mirror(QName, MirrorNode, SyncMode) ->
case rabbit_vhost_sup_sup:get_vhost_sup(VHost, MirrorNode) of
{ok, _} ->
try
SPid = rabbit_amqqueue_sup_sup:start_queue_process(
MirrorNode, Q, slave),
log_info(QName, "Adding mirror on node ~p: ~p~n",
[MirrorNode, SPid]),
rabbit_mirror_queue_slave:go(SPid, SyncMode)
MirrorPid = rabbit_amqqueue_sup_sup:start_queue_process(MirrorNode, Q, slave),
log_info(QName, "Adding mirror on node ~p: ~p~n", [MirrorNode, MirrorPid]),
rabbit_mirror_queue_slave:go(MirrorPid, SyncMode)
of
_ -> ok
catch
Expand All @@ -264,10 +263,10 @@ add_mirror(QName, MirrorNode, SyncMode) ->
report_deaths(_MirrorPid, _IsMaster, _QueueName, []) ->
ok;
report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) ->
log_info(QueueName, "~s ~s saw deaths of mirrors~s~n",
log_info(QueueName, "~s replica of queue ~s detected replica ~s to be down~n",
[case IsMaster of
true -> "Master";
false -> "Slave"
true -> "Primary";
false -> "Secondary"
end,
rabbit_misc:pid_to_string(MirrorPid),
[[$ , rabbit_misc:pid_to_string(P)] || P <- DeadPids]]).
Expand Down Expand Up @@ -447,14 +446,15 @@ is_mirrored_ha_nodes(Q) ->
end.

actual_queue_nodes(Q) when ?is_amqqueue(Q) ->
MPid = amqqueue:get_pid(Q),
SPids = amqqueue:get_slave_pids(Q),
SSPids = amqqueue:get_sync_slave_pids(Q),
Nodes = fun (L) -> [node(Pid) || Pid <- L] end,
{case MPid of
none -> none;
_ -> node(MPid)
end, Nodes(SPids), Nodes(SSPids)}.
PrimaryPid = amqqueue:get_pid(Q),
MirrorPids = amqqueue:get_slave_pids(Q),
InSyncMirrorPids = amqqueue:get_sync_slave_pids(Q),
CollectNodes = fun (L) -> [node(Pid) || Pid <- L] end,
NodeHostingPrimary = case PrimaryPid of
none -> none;
_ -> node(PrimaryPid)
end,
{NodeHostingPrimary, CollectNodes(MirrorPids), CollectNodes(InSyncMirrorPids)}.

-spec maybe_auto_sync(amqqueue:amqqueue()) -> 'ok'.

Expand Down Expand Up @@ -520,19 +520,19 @@ update_mirrors(OldQ, NewQ) when ?amqqueue_pids_are_equal(OldQ, NewQ) ->

update_mirrors(Q) when ?is_amqqueue(Q) ->
QName = amqqueue:get_name(Q),
{OldMNode, OldSNodes, _} = actual_queue_nodes(Q),
{NewMNode, NewSNodes} = suggested_queue_nodes(Q),
OldNodes = [OldMNode | OldSNodes],
NewNodes = [NewMNode | NewSNodes],
{PreTransferPrimaryNode, PreTransferMirrorNodes, __PreTransferInSyncMirrorNodes} = actual_queue_nodes(Q),
{NewlySelectedPrimaryNode, NewlySelectedMirrorNodes} = suggested_queue_nodes(Q),
PreTransferNodesWithReplicas = [PreTransferPrimaryNode | PreTransferMirrorNodes],
NewlySelectedNodesWithReplicas = [NewlySelectedPrimaryNode | NewlySelectedMirrorNodes],
%% When a mirror dies, remove_from_queue/2 might have to add new
%% mirrors (in "exactly" mode). It will check mnesia to see which
%% mirrors (in "exactly" mode). It will check the queue record to see which
%% mirrors there currently are. If drop_mirror/2 is invoked first
%% then when we end up in remove_from_queue/2 it will not see the
%% mirrors that add_mirror/2 will add, and also want to add them
%% (even though we are not responding to the death of a
%% mirror). Breakage ensues.
add_mirrors (QName, NewNodes -- OldNodes, async),
drop_mirrors(QName, OldNodes -- NewNodes),
add_mirrors(QName, NewlySelectedNodesWithReplicas -- PreTransferNodesWithReplicas, async),
drop_mirrors(QName, PreTransferNodesWithReplicas -- NewlySelectedNodesWithReplicas),
%% This is for the case where no extra nodes were added but we changed to
%% a policy requiring auto-sync.
maybe_auto_sync(Q),
Expand All @@ -543,15 +543,47 @@ queue_length(Q) ->
M.

get_replicas(Q) ->
{MNode, SNodes} = suggested_queue_nodes(Q),
[MNode] ++ SNodes.
{PrimaryNode, MirrorNodes} = suggested_queue_nodes(Q),
[PrimaryNode] ++ MirrorNodes.

%% Moves the primary replica (leader) of a classic mirrored queue to another node.
%% Target node can be any node in the cluster, and does not have to host a replica
%% of this queue.
transfer_leadership(Q, Destination) ->
QName = amqqueue:get_name(Q),
{OldMNode, OldSNodes, _} = actual_queue_nodes(Q),
OldNodes = [OldMNode | OldSNodes],
add_mirrors(QName, [Destination] -- OldNodes, async),
drop_mirrors(QName, OldNodes -- [Destination]),
{PreTransferPrimaryNode, PreTransferMirrorNodes, _PreTransferInSyncMirrorNodes} = actual_queue_nodes(Q),
PreTransferNodesWithReplicas = [PreTransferPrimaryNode | PreTransferMirrorNodes],

NodesToAddMirrorsOn = [Destination] -- PreTransferNodesWithReplicas,
%% This will wait for the transfer/eager sync to finish before we begin dropping
%% mirrors on the next step. In this case we cannot add mirrors asynchronously
%% as that will race with the dropping step.
add_mirrors(QName, NodesToAddMirrorsOn, sync),

NodesToDropMirrorsOn = PreTransferNodesWithReplicas -- [Destination],
drop_mirrors(QName, NodesToDropMirrorsOn),

{Result, NewQ} = wait_for_new_master(QName, Destination),
update_mirrors(NewQ),
Result.

%% Moves the primary replica (leader) of a classic mirrored queue to another node
%% which already hosts a replica of this queue. In this case we can stop
%% fewer replicas and reduce the load the operation has on the cluster.
migrate_leadership_to_existing_replica(Q, Destination) ->
QName = amqqueue:get_name(Q),
{PreTransferPrimaryNode, PreTransferMirrorNodes, _PreTransferInSyncMirrorNodes} = actual_queue_nodes(Q),
PreTransferNodesWithReplicas = [PreTransferPrimaryNode | PreTransferMirrorNodes],

NodesToAddMirrorsOn = [Destination] -- PreTransferNodesWithReplicas,
%% This will wait for the transfer/eager sync to finish before we begin dropping
%% mirrors on the next step. In this case we cannot add mirrors asynchronously
%% as that will race with the dropping step.
add_mirrors(QName, NodesToAddMirrorsOn, sync),

NodesToDropMirrorsOn = [PreTransferPrimaryNode],
drop_mirrors(QName, NodesToDropMirrorsOn),

{Result, NewQ} = wait_for_new_master(QName, Destination),
update_mirrors(NewQ),
Result.
Expand Down