Skip to content

Commit

Permalink
Merge pull request #12740 from rabbitmq/mergify/bp/v4.0.x/pr-12727
Browse files Browse the repository at this point in the history
By @Ayanda-D: Ensure only alive leaders and followers when fetching QQ replica states (backport #12727)
  • Loading branch information
michaelklishin authored Nov 15, 2024
2 parents 7afb420 + 072ccc7 commit 0caa066
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 21 deletions.
31 changes: 20 additions & 11 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -441,19 +441,28 @@ become_leader0(QName, Name) ->
-spec all_replica_states() -> {node(), #{atom() => atom()}}.
all_replica_states() ->
Rows0 = ets:tab2list(ra_state),
Rows = lists:map(fun
({K, follower, promotable}) ->
{K, promotable};
({K, follower, non_voter}) ->
{K, non_voter};
({K, S, _}) ->
%% voter or unknown
{K, S};
(T) ->
T
end, Rows0),
Rows = lists:filtermap(
fun
(T = {K, _, _}) ->
case whereis(K) of
undefined ->
false;
P when is_pid(P) ->
{true, to_replica_state(T)}
end;
(_T) ->
false
end, Rows0),
{node(), maps:from_list(Rows)}.

to_replica_state({K, follower, promotable}) ->
{K, promotable};
to_replica_state({K, follower, non_voter}) ->
{K, non_voter};
to_replica_state({K, S, _}) ->
%% voter or unknown
{K, S}.

-spec list_with_minimum_quorum() -> [amqqueue:amqqueue()].
list_with_minimum_quorum() ->
Queues = rabbit_amqqueue:list_local_quorum_queues(),
Expand Down
51 changes: 50 additions & 1 deletion deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ groups() ->
force_all_queues_shrink_member_to_current_member,
force_vhost_queues_shrink_member_to_current_member,
policy_repair,
gh_12635
gh_12635,
replica_states
]
++ all_tests()},
{cluster_size_5, [], [start_queue,
Expand Down Expand Up @@ -4355,6 +4356,54 @@ requeue_multiple_false(Config) ->
?assertEqual(#'queue.delete_ok'{message_count = 0},
amqp_channel:call(Ch, #'queue.delete'{queue = QQ})).

replica_states(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),

[?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}]))
|| Q <- [<<"Q1">>, <<"Q2">>, <<"Q3">>]],

Qs = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []),

[Q1_ClusterName, Q2_ClusterName, Q3_ClusterName] =
[begin
{ClusterName, _} = amqqueue:get_pid(Q),
ClusterName
end
|| Q <- Qs, amqqueue:get_type(Q) == rabbit_quorum_queue],

Result1 = rabbit_misc:append_rpc_all_nodes(Servers, rabbit_quorum_queue, all_replica_states, []),
ct:pal("all replica states: ~tp", [Result1]),

lists:map(fun({_Node, ReplicaStates}) ->
?assert(maps:is_key(Q1_ClusterName, ReplicaStates)),
?assert(maps:is_key(Q2_ClusterName, ReplicaStates)),
?assert(maps:is_key(Q3_ClusterName, ReplicaStates))
end, Result1),

%% Unregister a few queues (same outcome of 'noproc')
rabbit_ct_broker_helpers:rpc(Config, Server, erlang, unregister, [Q2_ClusterName]),
rabbit_ct_broker_helpers:rpc(Config, Server, erlang, unregister, [Q3_ClusterName]),

?assert(undefined == rabbit_ct_broker_helpers:rpc(Config, Server, erlang, whereis, [Q2_ClusterName])),
?assert(undefined == rabbit_ct_broker_helpers:rpc(Config, Server, erlang, whereis, [Q3_ClusterName])),

Result2 = rabbit_misc:append_rpc_all_nodes(Servers, rabbit_quorum_queue, all_replica_states, []),
ct:pal("replica states with a node missing Q1 and Q2: ~tp", [Result2]),

lists:map(fun({Node, ReplicaStates}) ->
if Node == Server ->
?assert(maps:is_key(Q1_ClusterName, ReplicaStates)),
?assertNot(maps:is_key(Q2_ClusterName, ReplicaStates)),
?assertNot(maps:is_key(Q3_ClusterName, ReplicaStates));
true ->
?assert(maps:is_key(Q1_ClusterName, ReplicaStates)),
?assert(maps:is_key(Q2_ClusterName, ReplicaStates)),
?assert(maps:is_key(Q3_ClusterName, ReplicaStates))
end
end, Result2).

%%----------------------------------------------------------------------------

same_elements(L1, L2)
Expand Down
30 changes: 21 additions & 9 deletions deps/rabbit/test/unit_quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

all() ->
[
all_replica_states_includes_nonvoters,
all_replica_states_includes_alive_nonvoters,
filter_nonvoters,
filter_quorum_critical_accounts_nonvoters,
ra_machine_conf_delivery_limit
Expand Down Expand Up @@ -97,27 +97,29 @@ filter_nonvoters(_Config) ->
[Q4] = rabbit_quorum_queue:filter_promotable(Qs, Ss),
ok.

all_replica_states_includes_nonvoters(_Config) ->
all_replica_states_includes_alive_nonvoters(_Config) ->
ets:new(ra_state, [named_table, public, {write_concurrency, true}]),
QPids = start_qprocs(_AliveQs = [q1, q2, q3, q4]),
ets:insert(ra_state, [
{q1, leader, voter},
{q2, follower, voter},
{q3, follower, promotable},
{q4, init, unknown},
%% pre ra-2.7.0
{q5, leader},
{q6, follower}
%% queues in ra_state but not alive
{q5, leader, voter},
{q6, follower, noproc}
]),
{_, #{
q1 := leader,
q2 := follower,
q3 := promotable,
q4 := init,
q5 := leader,
q6 := follower
}} = rabbit_quorum_queue:all_replica_states(),
q4 := init
} = ReplicaStates} = rabbit_quorum_queue:all_replica_states(),
?assertNot(maps:is_key(q5, ReplicaStates)),
?assertNot(maps:is_key(q6, ReplicaStates)),

true = ets:delete(ra_state),
_ = stop_qprocs(QPids),
ok.

make_ra_machine_conf(Q0, Arg, Pol, OpPol) ->
Expand All @@ -128,3 +130,13 @@ make_ra_machine_conf(Q0, Arg, Pol, OpPol) ->
{definition, [{<<"delivery-limit">>,OpPol}]}]),
rabbit_quorum_queue:ra_machine_config(Q).

start_qprocs(Qs) ->
[begin
QPid = spawn(fun() -> receive done -> ok end end),
erlang:register(Q, QPid),
QPid
end || Q <- Qs].

stop_qprocs(Pids) ->
[erlang:send(P, done)|| P <- Pids].

0 comments on commit 0caa066

Please sign in to comment.