Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1672,34 +1672,36 @@ online(Q) when ?is_amqqueue(Q) ->
is_pid(Pid)].

format(Q, Ctx) when ?is_amqqueue(Q) ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function is called for every queue when using the /queues HTTP API, depending on sort columns this could mean all queues in the system. Please compare HTTP API query times with and without this change with 5k quorum queues in a 3 node cluster.

I think we need to find another way of achieving this. Perhaps we need to persist the voting set in the queue record queue type state

%% TODO: this should really just be voters
Nodes = lists:sort(get_nodes(Q)),
Voters = lists:sort(get_voters(Q)),
Running = case Ctx of
#{running_nodes := Running0} ->
Running0;
_ ->
%% WARN: slow
rabbit_nodes:list_running()
end,
Online = [N || N <- Nodes, lists:member(N, Running)],
OnlineVoters = [N || N <- Voters, lists:member(N, Running)],
{_, LeaderNode} = amqqueue:get_pid(Q),
State = case is_minority(Nodes, Online) of
true when length(Online) == 0 ->
State = case is_minority(Voters, OnlineVoters) of
true when length(OnlineVoters) == 0 ->
down;
true ->
minority;
false ->
case lists:member(LeaderNode, Online) of
case lists:member(LeaderNode, OnlineVoters) of
true ->
running;
false ->
down
end
end,
Nodes = lists:sort(get_nodes(Q)),
Online = [N || N <- Nodes, lists:member(N, Running)],
[{type, quorum},
{state, State},
{node, LeaderNode},
{members, Nodes},
{voters, Voters},
{leader, LeaderNode},
{online, Online}].

Expand Down Expand Up @@ -1783,6 +1785,14 @@ get_nodes(Q) when ?is_amqqueue(Q) ->
#{nodes := Nodes} = amqqueue:get_type_state(Q),
Nodes.

get_voters(Q) when ?amqqueue_is_quorum(Q) ->
Leader = amqqueue:get_pid(Q),
case ra_server_proc:local_state_query(Leader, voters, 5000) of
{ok, Vs, _} -> [Node || {_RaName, Node} <- Vs];
_ -> []
end.


get_connected_nodes(Q) when ?is_amqqueue(Q) ->
ErlangNodes = [node() | nodes()],
[N || N <- get_nodes(Q), lists:member(N, ErlangNodes)].
Expand Down Expand Up @@ -1907,6 +1917,9 @@ force_all_queues_shrink_member_to_current_member() ->
rabbit_log:warning("Disaster recovery procedure: shrinking finished"),
ok.

is_minority([], _) ->
%% Leader couldn't list voters.
true;
is_minority(All, Up) ->
MinQuorum = length(All) div 2 + 1,
length(Up) < MinQuorum.
Expand Down