1010-include (" amqqueue.hrl" ).
1111
1212-export ([queue_leader_locators /0 ,
13- select_leader_and_followers /2 ]).
13+ select_leader_and_followers /3 ]).
1414
15- -define (QUEUE_LEADER_LOCATORS_DEPRECATED , [<<" random" >>, <<" least-leaders" >>]).
15+ % % these are needed because of they are called with ?MODULE:
16+ % % to allow mecking them in tests
17+ -export ([node /0 ,
18+ queues_per_node /2 ]).
19+
20+ -ifdef (TEST ).
21+ -export ([select_members /7 , leader_node /6 , leader_locator /1 ]).
22+ -endif .
23+
24+ -define (QUEUE_LEADER_LOCATORS_DEPRECATED , [<<" random" >>, <<" least-leaders" >>, <<" min-masters" >>]).
1625-define (QUEUE_LEADER_LOCATORS , [<<" client-local" >>, <<" balanced" >>] ++ ? QUEUE_LEADER_LOCATORS_DEPRECATED ).
1726-define (QUEUE_COUNT_START_RANDOM_SELECTION , 1_000 ).
1827
2332queue_leader_locators () ->
2433 ? QUEUE_LEADER_LOCATORS .
2534
26- -spec select_leader_and_followers (amqqueue :amqqueue (), pos_integer ()) ->
35+ -spec select_leader_and_followers (amqqueue :amqqueue (), pos_integer (), atom () ) ->
2736 {Leader :: node (), Followers :: [node ()]}.
28- select_leader_and_followers (Q , Size )
29- when (? amqqueue_is_quorum (Q ) orelse ? amqqueue_is_stream (Q )) andalso is_integer (Size ) ->
37+ select_leader_and_followers (Q , Size , QueueType )
38+ when (? amqqueue_is_quorum (Q ) orelse ? amqqueue_is_stream (Q ) orelse ? amqqueue_is_classic (Q )) andalso is_integer (Size ) ->
39+ LeaderLocator = leader_locator (Q ),
40+ do_select_leader_and_followers (Size , QueueType , LeaderLocator ).
41+
42+ -spec do_select_leader_and_followers (pos_integer (), atom (), queue_leader_locator ()) ->
43+ {Leader :: node (), Followers :: [node ()]}.
44+ do_select_leader_and_followers (1 , _ , <<" client-local" >>) ->
45+ % % optimisation for classic queues
46+ {? MODULE :node (), []};
47+ do_select_leader_and_followers (Size , QueueType , LeaderLocator ) ->
3048 AllNodes = rabbit_nodes :list_members (),
3149 RunningNodes = rabbit_nodes :filter_running (AllNodes ),
32- true = lists :member (node (), AllNodes ),
33- QueueType = amqqueue :get_type (Q ),
50+ true = lists :member (? MODULE :node (), AllNodes ),
3451 GetQueues0 = get_queues_for_type (QueueType ),
3552 % % TODO do we always need the queue count? it can be expensive, check if it can be skipped!
3653 % % for example, for random
3754 QueueCount = rabbit_amqqueue :count (),
3855 QueueCountStartRandom = application :get_env (rabbit , queue_count_start_random_selection ,
3956 ? QUEUE_COUNT_START_RANDOM_SELECTION ),
40- {Replicas , GetQueues } = select_replicas (Size , AllNodes , RunningNodes ,
57+ {Members , GetQueues } = select_members (Size , QueueType , AllNodes , RunningNodes ,
4158 QueueCount , QueueCountStartRandom , GetQueues0 ),
42- LeaderLocator = leader_locator (Q ),
43- Leader = leader_node (LeaderLocator , Replicas , RunningNodes ,
59+ Leader = leader_node (LeaderLocator , Members , RunningNodes ,
4460 QueueCount , QueueCountStartRandom , GetQueues ),
45- Followers = lists :delete (Leader , Replicas ),
61+ Followers = lists :delete (Leader , Members ),
4662 {Leader , Followers }.
4763
4864-spec leader_locator (amqqueue :amqqueue ()) ->
@@ -53,7 +69,15 @@ leader_locator(Q) ->
5369 fun (PolVal , _ArgVal ) -> PolVal end ,
5470 Q ) of
5571 undefined ->
56- application :get_env (rabbit , queue_leader_locator , undefined );
72+ case rabbit_queue_type_util :args_policy_lookup (
73+ <<" queue-master-locator" >>,
74+ fun (PolVal , _ArgVal ) -> PolVal end ,
75+ Q ) of
76+ undefined ->
77+ application :get_env (rabbit , queue_leader_locator , undefined );
78+ Val ->
79+ Val
80+ end ;
5781 Val ->
5882 Val
5983 end ,
@@ -63,40 +87,57 @@ leader_locator0(<<"client-local">>) ->
6387 <<" client-local" >>;
6488leader_locator0 (<<" balanced" >>) ->
6589 <<" balanced" >>;
66- % % 'random' and 'least-leaders' are deprecated
90+ % % 'random', 'least-leaders' and 'min-masters ' are deprecated
6791leader_locator0 (<<" random" >>) ->
6892 <<" balanced" >>;
6993leader_locator0 (<<" least-leaders" >>) ->
7094 <<" balanced" >>;
95+ leader_locator0 (<<" min-masters" >>) ->
96+ <<" balanced" >>;
7197leader_locator0 (_ ) ->
7298 % % default
7399 <<" client-local" >>.
74100
75- -spec select_replicas (pos_integer (), [node (),...], [node (),...],
101+ -spec select_members (pos_integer (), term (), [node (),...], [node (),...],
76102 non_neg_integer (), non_neg_integer (), function ()) ->
77103 {[node (),...], function ()}.
78- select_replicas (Size , AllNodes , _ , _ , _ , Fun )
104+ select_members (Size , _ , AllNodes , _ , _ , _ , Fun )
79105 when length (AllNodes ) =< Size ->
80106 {AllNodes , Fun };
107+ % % Classic queues: above the threshold, pick a random node
108+ % % For classic queues, when there's a lot of queues, if we knew that the
109+ % % distribution of queues between nodes is relatively even, it'd be better
110+ % % to declare this queue locally rather than randomly. However, currently,
111+ % % counting queues on each node is relatively expensive. Users can use
112+ % % the client-local strategy if they know their connections are well balanced
113+ select_members (1 , rabbit_classic_queue , _AllNodes , RunningNodes , QueueCount , QueueCountStartRandom , GetQueues )
114+ when QueueCount >= QueueCountStartRandom ->
115+ {RunningNodes , GetQueues };
116+ select_members (1 , rabbit_classic_queue , _ , RunningNodes , _ , _ , GetQueues ) ->
117+ {RunningNodes , GetQueues };
118+ % % Quorum queues and streams
81119% % Select nodes in the following order:
82120% % 1. Local node to have data locality for declaring client.
83121% % 2. Running nodes.
84- % % 3.1. If there are many queues: Randomly to avoid expensive calculation of counting replicas
122+ % % 3.1. If there are many queues: Randomly to avoid expensive calculation of counting members
85123% % per node. Random replica selection is good enough for most use cases.
86- % % 3.2. If there are few queues: Nodes with least replicas to have a "balanced" RabbitMQ cluster.
87- select_replicas (Size , AllNodes , RunningNodes , QueueCount , QueueCountStartRandom , GetQueues )
124+ % % 3.2. If there are few queues: Nodes with least members to have a "balanced" RabbitMQ cluster.
125+ select_members (Size , _ , AllNodes , RunningNodes , QueueCount , QueueCountStartRandom , GetQueues )
88126 when QueueCount >= QueueCountStartRandom ->
89- L0 = shuffle (lists :delete (node (), AllNodes )),
127+ L0 = shuffle (lists :delete (? MODULE : node (), AllNodes )),
90128 L1 = lists :sort (fun (X , _Y ) ->
91129 lists :member (X , RunningNodes )
92130 end , L0 ),
93131 {L , _ } = lists :split (Size - 1 , L1 ),
94- {[node () | L ], GetQueues };
95- select_replicas (Size , AllNodes , RunningNodes , _ , _ , GetQueues ) ->
96- Counters0 = maps :from_list ([{N , 0 } || N <- lists :delete (node (), AllNodes )]),
132+ {[? MODULE : node () | L ], GetQueues };
133+ select_members (Size , _ , AllNodes , RunningNodes , _ , _ , GetQueues ) ->
134+ Counters0 = maps :from_list ([{N , 0 } || N <- lists :delete (? MODULE : node (), AllNodes )]),
97135 Queues = GetQueues (),
98136 Counters = lists :foldl (fun (Q , Acc ) ->
99- #{nodes := Nodes } = amqqueue :get_type_state (Q ),
137+ Nodes = case amqqueue :get_type (Q ) of
138+ rabbit_classic_queue -> [amqqueue :qnode (Q )];
139+ _ -> maps :get (nodes , amqqueue :get_type_state (Q ))
140+ end ,
100141 lists :foldl (fun (N , A )
101142 when is_map_key (N , A ) ->
102143 maps :update_with (N , fun (C ) -> C + 1 end , A );
@@ -118,46 +159,35 @@ select_replicas(Size, AllNodes, RunningNodes, _, _, GetQueues) ->
118159 end , L0 ),
119160 {L2 , _ } = lists :split (Size - 1 , L1 ),
120161 L = lists :map (fun ({N , _ }) -> N end , L2 ),
121- {[node () | L ], fun () -> Queues end }.
162+ {[? MODULE : node () | L ], fun () -> Queues end }.
122163
123164-spec leader_node (queue_leader_locator (), [node (),...], [node (),...],
124165 non_neg_integer (), non_neg_integer (), function ()) ->
125166 node ().
126167leader_node (<<" client-local" >>, _ , _ , _ , _ , _ ) ->
127- node ();
168+ ? MODULE : node ();
128169leader_node (<<" balanced" >>, Nodes0 , RunningNodes , QueueCount , QueueCountStartRandom , _ )
129170 when QueueCount >= QueueCountStartRandom ->
130171 Nodes = potential_leaders (Nodes0 , RunningNodes ),
131172 lists :nth (rand :uniform (length (Nodes )), Nodes );
132- leader_node (<<" balanced" >>, Nodes0 , RunningNodes , _ , _ , GetQueues )
173+ leader_node (<<" balanced" >>, Members0 , RunningNodes , _ , _ , GetQueues )
133174 when is_function (GetQueues , 0 ) ->
134- Nodes = potential_leaders (Nodes0 , RunningNodes ),
135- Counters0 = maps :from_list ([{N , 0 } || N <- Nodes ]),
136- Counters = lists :foldl (fun (Q , Acc ) ->
137- case amqqueue :get_pid (Q ) of
138- {RaName , LeaderNode }
139- when is_atom (RaName ), is_atom (LeaderNode ), is_map_key (LeaderNode , Acc ) ->
140- maps :update_with (LeaderNode , fun (C ) -> C + 1 end , Acc );
141- StreamLeaderPid
142- when is_pid (StreamLeaderPid ), is_map_key (node (StreamLeaderPid ), Acc ) ->
143- maps :update_with (node (StreamLeaderPid ), fun (C ) -> C + 1 end , Acc );
144- _ ->
145- Acc
146- end
147- end , Counters0 , GetQueues ()),
175+ Members = potential_leaders (Members0 , RunningNodes ),
176+ Counters = ? MODULE :queues_per_node (Members , GetQueues ),
177+ ct :pal (" Counters ~p " , [Counters ]),
148178 {Node , _ } = hd (lists :keysort (2 , maps :to_list (Counters ))),
149179 Node .
150180
151- potential_leaders (Replicas , RunningNodes ) ->
181+ potential_leaders (Members , RunningNodes ) ->
152182 case lists :filter (fun (R ) ->
153183 lists :member (R , RunningNodes )
154- end , Replicas ) of
184+ end , Members ) of
155185 [] ->
156- Replicas ;
157- RunningReplicas ->
158- case rabbit_maintenance :filter_out_drained_nodes_local_read (RunningReplicas ) of
186+ Members ;
187+ RunningMembers ->
188+ case rabbit_maintenance :filter_out_drained_nodes_local_read (RunningMembers ) of
159189 [] ->
160- RunningReplicas ;
190+ RunningMembers ;
161191 Filtered ->
162192 Filtered
163193 end
@@ -172,3 +202,22 @@ shuffle(L0) when is_list(L0) ->
172202 L1 = lists :map (fun (E ) -> {rand :uniform (), E } end , L0 ),
173203 L = lists :keysort (1 , L1 ),
174204 lists :map (fun ({_ , E }) -> E end , L ).
205+
206+ queues_per_node (Nodes , GetQueues ) ->
207+ Counters0 = maps :from_list ([{N , 0 } || N <- Nodes ]),
208+ lists :foldl (fun (Q , Acc ) ->
209+ case amqqueue :get_pid (Q ) of
210+ {RaName , LeaderNode } % % quorum queues
211+ when is_atom (RaName ), is_atom (LeaderNode ), is_map_key (LeaderNode , Acc ) ->
212+ maps :update_with (LeaderNode , fun (C ) -> C + 1 end , Acc );
213+ Pid % % classic queues and streams
214+ when is_pid (Pid ), is_map_key (node (Pid ), Acc ) ->
215+ maps :update_with (node (Pid ), fun (C ) -> C + 1 end , Acc );
216+ _ ->
217+ Acc
218+ end
219+ end , Counters0 , GetQueues ()).
220+
221+ % % for unit testing
222+ -spec node () -> node ().
223+ node () -> erlang :node ().
0 commit comments