|
169 | 169 | -define(MIN_CHECKPOINT_INTERVAL, 64). |
170 | 170 | -define(LEADER_HEALTH_CHECK_TIMEOUT, 5_000). |
171 | 171 | -define(GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT, 60_000). |
| 172 | +-define(RA_MEMBERS_TIMEOUT, 30_000). |
172 | 173 |
|
173 | 174 | %%----------- QQ policies --------------------------------------------------- |
174 | 175 |
|
@@ -1229,7 +1230,6 @@ policy_changed(Q) -> |
1229 | 1230 | end. |
1230 | 1231 |
|
1231 | 1232 | -spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'. |
1232 | | - |
1233 | 1233 | cluster_state(Name) -> |
1234 | 1234 | case whereis(Name) of |
1235 | 1235 | undefined -> down; |
@@ -1577,17 +1577,18 @@ grow(Node, VhostSpec, QueueSpec, Strategy, Membership) -> |
1577 | 1577 | is_match(amqqueue:get_vhost(Q), VhostSpec) andalso |
1578 | 1578 | is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ]. |
1579 | 1579 |
|
1580 | | --spec transfer_leadership(amqqueue:amqqueue(), node()) -> {migrated, node()} | {not_migrated, atom()}. |
| 1580 | +-spec transfer_leadership(amqqueue:amqqueue(), node()) -> |
| 1581 | + {migrated, node()} | {not_migrated, atom()}. |
1581 | 1582 | transfer_leadership(Q, Destination) -> |
1582 | | - {RaName, _} = Pid = amqqueue:get_pid(Q), |
1583 | | - case ra:transfer_leadership(Pid, {RaName, Destination}) of |
| 1583 | + {RaName, _} = Leader = amqqueue:get_pid(Q), |
| 1584 | + case ra:transfer_leadership(Leader, {RaName, Destination}) of |
1584 | 1585 | ok -> |
1585 | | - case ra:members(Pid) of |
1586 | | - {_, _, {_, NewNode}} -> |
1587 | | - {migrated, NewNode}; |
1588 | | - {timeout, _} -> |
1589 | | - {not_migrated, ra_members_timeout} |
1590 | | - end; |
| 1586 | + case ra:members(Leader, ?RA_MEMBERS_TIMEOUT) of |
| 1587 | + {_, _, {_, NewNode}} -> |
| 1588 | + {migrated, NewNode}; |
| 1589 | + {timeout, _} -> |
| 1590 | + {not_migrated, ra_members_timeout} |
| 1591 | + end; |
1591 | 1592 | already_leader -> |
1592 | 1593 | {not_migrated, already_leader}; |
1593 | 1594 | {error, Reason} -> |
@@ -1750,9 +1751,18 @@ i(memory, Q) when ?is_amqqueue(Q) -> |
1750 | 1751 | 0 |
1751 | 1752 | end; |
1752 | 1753 | i(state, Q) when ?is_amqqueue(Q) -> |
1753 | | - {Name, Node} = amqqueue:get_pid(Q), |
| 1754 | + {Name, Node} = case find_leader(Q) of |
| 1755 | + undefined -> |
| 1756 | + %% fall back to queue record |
| 1757 | + amqqueue:get_pid(Q); |
| 1758 | + Leader -> |
| 1759 | + Leader |
| 1760 | + end, |
1754 | 1761 | %% Check against the leader or last known leader |
1755 | 1762 | case erpc_call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of |
| 1763 | + {error, {erpc, timeout}} -> |
| 1764 | + %% ?? |
| 1765 | + timeout; |
1756 | 1766 | {error, _} -> |
1757 | 1767 | down; |
1758 | 1768 | State -> |
@@ -1912,7 +1922,12 @@ format(Q, Ctx) when ?is_amqqueue(Q) -> |
1912 | 1922 | rabbit_nodes:list_running() |
1913 | 1923 | end, |
1914 | 1924 | Online = [N || N <- Nodes, lists:member(N, Running)], |
1915 | | - {_, LeaderNode} = amqqueue:get_pid(Q), |
| 1925 | + {_, LeaderNode} = case find_leader(Q) of |
| 1926 | + undefined -> |
| 1927 | + amqqueue:get_pid(Q); |
| 1928 | + Leader -> |
| 1929 | + Leader |
| 1930 | + end, |
1916 | 1931 | State = case is_minority(Nodes, Online) of |
1917 | 1932 | true when length(Online) == 0 -> |
1918 | 1933 | down; |
@@ -2299,27 +2314,50 @@ drain(TransferCandidates) -> |
2299 | 2314 | transfer_leadership([]) -> |
2300 | 2315 | ?LOG_WARNING("Skipping leadership transfer of quorum queues: no candidate " |
2301 | 2316 | "(online, not under maintenance) nodes to transfer to!"); |
2302 | | -transfer_leadership(_TransferCandidates) -> |
| 2317 | +transfer_leadership(_CandidateNodes) -> |
2303 | 2318 | %% we only transfer leadership for QQs that have local leaders |
2304 | | - Queues = rabbit_amqqueue:list_local_leaders(), |
| 2319 | + LocalLeaderQueues = rabbit_amqqueue:list_local_leaders(), |
| 2320 | + QueuesChunked = ra_lib:lists_chunk(256, LocalLeaderQueues), |
2305 | 2321 | ?LOG_INFO("Will transfer leadership of ~b quorum queues with current leader on this node", |
2306 | | - [length(Queues)]), |
2307 | | - _ = [begin |
2308 | | - Name = amqqueue:get_name(Q), |
2309 | | - ?LOG_DEBUG("Will trigger a leader election for local quorum queue ~ts", |
2310 | | - [rabbit_misc:rs(Name)]), |
2311 | | - %% we trigger an election and exclude this node from the list of candidates |
2312 | | - %% by simply shutting its local QQ replica (Ra server) |
2313 | | - RaLeader = amqqueue:get_pid(Q), |
2314 | | - ?LOG_DEBUG("Will stop Ra server ~tp", [RaLeader]), |
2315 | | - case rabbit_quorum_queue:stop_server(RaLeader) of |
2316 | | - ok -> |
2317 | | - ?LOG_DEBUG("Successfully stopped Ra server ~tp", [RaLeader]); |
2318 | | - {error, nodedown} -> |
2319 | | - ?LOG_ERROR("Failed to stop Ra server ~tp: target node was reported as down") |
2320 | | - end |
2321 | | - end || Q <- Queues], |
2322 | | - ?LOG_INFO("Leadership transfer for quorum queues hosted on this node has been initiated"). |
| 2322 | + [length(LocalLeaderQueues)]), |
| 2323 | + [begin |
| 2324 | + [begin |
| 2325 | + %% we trigger an election and exclude this node from the list of candidates |
| 2326 | + %% by simply shutting its local QQ replica (Ra server) |
| 2327 | + RaLeader = amqqueue:get_pid(Q), |
| 2328 | + ?LOG_DEBUG("Will stop Ra leader ~tp", [RaLeader]), |
| 2329 | + case rabbit_quorum_queue:stop_server(RaLeader) of |
| 2330 | + ok -> |
| 2331 | + ?LOG_DEBUG("Successfully stopped Ra server ~tp", [RaLeader]); |
| 2332 | + {error, nodedown} -> |
| 2333 | + ?LOG_ERROR("Failed to stop Ra server ~tp: target node was reported as down") |
| 2334 | + end, |
| 2335 | + ok |
| 2336 | + end || Q <- Queues], |
| 2337 | + %% wait for leader elections before processing next chunk of queues |
| 2338 | + [begin |
| 2339 | + {RaName, LeaderNode} = amqqueue:get_pid(Q), |
| 2340 | + MemberNodes = lists:delete(LeaderNode, amqqueue:get_nodes(Q)), |
| 2341 | + %% we don't do any explicit error handling here as it is more |
| 2342 | + %% important to make progress |
| 2343 | + _ = lists:any(fun (N) -> |
| 2344 | + case ra:members({RaName, N}, ?RA_MEMBERS_TIMEOUT) of |
| 2345 | + {ok, _, _} -> |
| 2346 | + true; |
| 2347 | + Err -> |
| 2348 | + Name = amqqueue:get_name(Q), |
| 2349 | + ?LOG_DEBUG("Failed to wait for leader election for queue ~ts on ~tp Err ~ts", |
| 2350 | + [Name, N, Err]), |
| 2351 | + false |
| 2352 | + end |
| 2353 | + end, MemberNodes), |
| 2354 | + ok |
| 2355 | + |
| 2356 | + end || Q <- Queues], |
| 2357 | + ok |
| 2358 | + end || Queues <- QueuesChunked], |
| 2359 | + ?LOG_INFO("Leadership transfer for quorum queues hosted on this node has been initiated"), |
| 2360 | + ok. |
2323 | 2361 |
|
2324 | 2362 | %% TODO: I just copied it over, it looks like was always called inside maintenance so... |
2325 | 2363 | -spec stop_local_quorum_queue_followers() -> ok. |
|
0 commit comments