77
88-module (rabbit_queue_location ).
99
10+ -compile ([export_all ]).
1011-include (" amqqueue.hrl" ).
1112
1213-export ([queue_leader_locators /0 ,
13- select_leader_and_followers /2 ]).
14+ select_leader_and_followers /3 ]).
1415
15- -define (QUEUE_LEADER_LOCATORS_DEPRECATED , [<<" random" >>, <<" least-leaders" >>]).
16+ -define (QUEUE_LEADER_LOCATORS_DEPRECATED , [<<" random" >>, <<" least-leaders" >>, << " min-masters " >> ]).
1617-define (QUEUE_LEADER_LOCATORS , [<<" client-local" >>, <<" balanced" >>] ++ ? QUEUE_LEADER_LOCATORS_DEPRECATED ).
1718-define (QUEUE_COUNT_START_RANDOM_SELECTION , 1_000 ).
1819
2324queue_leader_locators () ->
2425 ? QUEUE_LEADER_LOCATORS .
2526
26- -spec select_leader_and_followers (amqqueue :amqqueue (), pos_integer ()) ->
27+ -spec select_leader_and_followers (amqqueue :amqqueue (), pos_integer (), atom () ) ->
2728 {Leader :: node (), Followers :: [node ()]}.
28- select_leader_and_followers (Q , Size )
29- when (? amqqueue_is_quorum (Q ) orelse ? amqqueue_is_stream (Q )) andalso is_integer (Size ) ->
29+ select_leader_and_followers (Q , Size , QueueType )
30+ when (? amqqueue_is_quorum (Q ) orelse ? amqqueue_is_stream (Q ) orelse ? amqqueue_is_classic (Q )) andalso is_integer (Size ) ->
31+ LeaderLocator = leader_locator (Q ),
32+ do_select_leader_and_followers (Size , QueueType , LeaderLocator ).
33+
34+ -spec do_select_leader_and_followers (pos_integer (), atom (), queue_leader_locator ()) ->
35+ {Leader :: node (), Followers :: [node ()]}.
36+ do_select_leader_and_followers (1 , _ , <<" client-local" >>) ->
37+ % % optimisation for classic queues
38+ {? MODULE :node (), []};
39+ do_select_leader_and_followers (Size , QueueType , LeaderLocator ) ->
3040 AllNodes = rabbit_nodes :list_members (),
3141 RunningNodes = rabbit_nodes :filter_running (AllNodes ),
32- true = lists :member (node (), AllNodes ),
33- QueueType = amqqueue :get_type (Q ),
42+ true = lists :member (? MODULE :node (), AllNodes ),
3443 GetQueues0 = get_queues_for_type (QueueType ),
3544 % % TODO do we always need the queue count? it can be expensive, check if it can be skipped!
3645 % % for example, for random
3746 QueueCount = rabbit_amqqueue :count (),
3847 QueueCountStartRandom = application :get_env (rabbit , queue_count_start_random_selection ,
3948 ? QUEUE_COUNT_START_RANDOM_SELECTION ),
40- {Replicas , GetQueues } = select_replicas (Size , AllNodes , RunningNodes ,
49+ {Members , GetQueues } = select_members (Size , QueueType , AllNodes , RunningNodes ,
4150 QueueCount , QueueCountStartRandom , GetQueues0 ),
42- LeaderLocator = leader_locator (Q ),
43- Leader = leader_node (LeaderLocator , Replicas , RunningNodes ,
51+ Leader = leader_node (LeaderLocator , Members , RunningNodes ,
4452 QueueCount , QueueCountStartRandom , GetQueues ),
45- Followers = lists :delete (Leader , Replicas ),
53+ Followers = lists :delete (Leader , Members ),
4654 {Leader , Followers }.
4755
4856-spec leader_locator (amqqueue :amqqueue ()) ->
@@ -53,7 +61,15 @@ leader_locator(Q) ->
5361 fun (PolVal , _ArgVal ) -> PolVal end ,
5462 Q ) of
5563 undefined ->
56- application :get_env (rabbit , queue_leader_locator , undefined );
64+ case rabbit_queue_type_util :args_policy_lookup (
65+ <<" queue-master-locator" >>,
66+ fun (PolVal , _ArgVal ) -> PolVal end ,
67+ Q ) of
68+ undefined ->
69+ application :get_env (rabbit , queue_leader_locator , undefined );
70+ Val ->
71+ Val
72+ end ;
5773 Val ->
5874 Val
5975 end ,
@@ -63,40 +79,57 @@ leader_locator0(<<"client-local">>) ->
6379 <<" client-local" >>;
6480leader_locator0 (<<" balanced" >>) ->
6581 <<" balanced" >>;
66- % % 'random' and 'least-leaders' are deprecated
82+ % % 'random', 'least-leaders' and 'min-masters ' are deprecated
6783leader_locator0 (<<" random" >>) ->
6884 <<" balanced" >>;
6985leader_locator0 (<<" least-leaders" >>) ->
7086 <<" balanced" >>;
87+ leader_locator0 (<<" min-masters" >>) ->
88+ <<" balanced" >>;
7189leader_locator0 (_ ) ->
7290 % % default
7391 <<" client-local" >>.
7492
75- -spec select_replicas (pos_integer (), [node (),...], [node (),...],
93+ -spec select_members (pos_integer (), term (), [node (),...], [node (),...],
7694 non_neg_integer (), non_neg_integer (), function ()) ->
7795 {[node (),...], function ()}.
78- select_replicas (Size , AllNodes , _ , _ , _ , Fun )
96+ select_members (Size , _ , AllNodes , _ , _ , _ , Fun )
7997 when length (AllNodes ) =< Size ->
8098 {AllNodes , Fun };
99+ % % Classic queues: above the threshold, pick a random node
100+ % % For classic queues, when there's a lot of queues, if we knew that the
101+ % % distribution of queues between nodes is relatively even, it'd be better
102+ % % to declare this queue locally rather than randomly. However, currently,
103+ % % counting queues on each node is relatively expensive. Users can use
104+ % % the client-local strategy if they know their connections are well balanced
105+ select_members (1 , rabbit_classic_queue , _AllNodes , RunningNodes , QueueCount , QueueCountStartRandom , GetQueues )
106+ when QueueCount >= QueueCountStartRandom ->
107+ {RunningNodes , GetQueues };
108+ select_members (1 , rabbit_classic_queue , _ , RunningNodes , _ , _ , GetQueues ) ->
109+ {RunningNodes , GetQueues };
110+ % % Quorum queues and streams
81111% % Select nodes in the following order:
82112% % 1. Local node to have data locality for declaring client.
83113% % 2. Running nodes.
84- % % 3.1. If there are many queues: Randomly to avoid expensive calculation of counting replicas
114+ % % 3.1. If there are many queues: Randomly to avoid expensive calculation of counting members
85115% % per node. Random replica selection is good enough for most use cases.
86- % % 3.2. If there are few queues: Nodes with least replicas to have a "balanced" RabbitMQ cluster.
87- select_replicas (Size , AllNodes , RunningNodes , QueueCount , QueueCountStartRandom , GetQueues )
116+ % % 3.2. If there are few queues: Nodes with least members to have a "balanced" RabbitMQ cluster.
117+ select_members (Size , _ , AllNodes , RunningNodes , QueueCount , QueueCountStartRandom , GetQueues )
88118 when QueueCount >= QueueCountStartRandom ->
89- L0 = shuffle (lists :delete (node (), AllNodes )),
119+ L0 = shuffle (lists :delete (? MODULE : node (), AllNodes )),
90120 L1 = lists :sort (fun (X , _Y ) ->
91121 lists :member (X , RunningNodes )
92122 end , L0 ),
93123 {L , _ } = lists :split (Size - 1 , L1 ),
94- {[node () | L ], GetQueues };
95- select_replicas (Size , AllNodes , RunningNodes , _ , _ , GetQueues ) ->
96- Counters0 = maps :from_list ([{N , 0 } || N <- lists :delete (node (), AllNodes )]),
124+ {[? MODULE : node () | L ], GetQueues };
125+ select_members (Size , _ , AllNodes , RunningNodes , _ , _ , GetQueues ) ->
126+ Counters0 = maps :from_list ([{N , 0 } || N <- lists :delete (? MODULE : node (), AllNodes )]),
97127 Queues = GetQueues (),
98128 Counters = lists :foldl (fun (Q , Acc ) ->
99- #{nodes := Nodes } = amqqueue :get_type_state (Q ),
129+ Nodes = case amqqueue :get_type (Q ) of
130+ rabbit_classic_queue -> [amqqueue :qnode (Q )];
131+ _ -> maps :get (nodes , amqqueue :get_type_state (Q ))
132+ end ,
100133 lists :foldl (fun (N , A )
101134 when is_map_key (N , A ) ->
102135 maps :update_with (N , fun (C ) -> C + 1 end , A );
@@ -118,46 +151,35 @@ select_replicas(Size, AllNodes, RunningNodes, _, _, GetQueues) ->
118151 end , L0 ),
119152 {L2 , _ } = lists :split (Size - 1 , L1 ),
120153 L = lists :map (fun ({N , _ }) -> N end , L2 ),
121- {[node () | L ], fun () -> Queues end }.
154+ {[? MODULE : node () | L ], fun () -> Queues end }.
122155
123156-spec leader_node (queue_leader_locator (), [node (),...], [node (),...],
124157 non_neg_integer (), non_neg_integer (), function ()) ->
125158 node ().
126159leader_node (<<" client-local" >>, _ , _ , _ , _ , _ ) ->
127- node ();
160+ ? MODULE : node ();
128161leader_node (<<" balanced" >>, Nodes0 , RunningNodes , QueueCount , QueueCountStartRandom , _ )
129162 when QueueCount >= QueueCountStartRandom ->
130163 Nodes = potential_leaders (Nodes0 , RunningNodes ),
131164 lists :nth (rand :uniform (length (Nodes )), Nodes );
132- leader_node (<<" balanced" >>, Nodes0 , RunningNodes , _ , _ , GetQueues )
165+ leader_node (<<" balanced" >>, Members0 , RunningNodes , _ , _ , GetQueues )
133166 when is_function (GetQueues , 0 ) ->
134- Nodes = potential_leaders (Nodes0 , RunningNodes ),
135- Counters0 = maps :from_list ([{N , 0 } || N <- Nodes ]),
136- Counters = lists :foldl (fun (Q , Acc ) ->
137- case amqqueue :get_pid (Q ) of
138- {RaName , LeaderNode }
139- when is_atom (RaName ), is_atom (LeaderNode ), is_map_key (LeaderNode , Acc ) ->
140- maps :update_with (LeaderNode , fun (C ) -> C + 1 end , Acc );
141- StreamLeaderPid
142- when is_pid (StreamLeaderPid ), is_map_key (node (StreamLeaderPid ), Acc ) ->
143- maps :update_with (node (StreamLeaderPid ), fun (C ) -> C + 1 end , Acc );
144- _ ->
145- Acc
146- end
147- end , Counters0 , GetQueues ()),
167+ Members = potential_leaders (Members0 , RunningNodes ),
168+ Counters = ? MODULE :queues_per_node (Members , GetQueues ),
169+ ct :pal (" Counters ~p " , [Counters ]),
148170 {Node , _ } = hd (lists :keysort (2 , maps :to_list (Counters ))),
149171 Node .
150172
151- potential_leaders (Replicas , RunningNodes ) ->
173+ potential_leaders (Members , RunningNodes ) ->
152174 case lists :filter (fun (R ) ->
153175 lists :member (R , RunningNodes )
154- end , Replicas ) of
176+ end , Members ) of
155177 [] ->
156- Replicas ;
157- RunningReplicas ->
158- case rabbit_maintenance :filter_out_drained_nodes_local_read (RunningReplicas ) of
178+ Members ;
179+ RunningMembers ->
180+ case rabbit_maintenance :filter_out_drained_nodes_local_read (RunningMembers ) of
159181 [] ->
160- RunningReplicas ;
182+ RunningMembers ;
161183 Filtered ->
162184 Filtered
163185 end
@@ -172,3 +194,22 @@ shuffle(L0) when is_list(L0) ->
172194 L1 = lists :map (fun (E ) -> {rand :uniform (), E } end , L0 ),
173195 L = lists :keysort (1 , L1 ),
174196 lists :map (fun ({_ , E }) -> E end , L ).
197+
198+ queues_per_node (Nodes , GetQueues ) ->
199+ Counters0 = maps :from_list ([{N , 0 } || N <- Nodes ]),
200+ lists :foldl (fun (Q , Acc ) ->
201+ case amqqueue :get_pid (Q ) of
202+ {RaName , LeaderNode } % % quorum queues
203+ when is_atom (RaName ), is_atom (LeaderNode ), is_map_key (LeaderNode , Acc ) ->
204+ maps :update_with (LeaderNode , fun (C ) -> C + 1 end , Acc );
205+ Pid % % classic queues and streams
206+ when is_pid (Pid ), is_map_key (node (Pid ), Acc ) ->
207+ maps :update_with (node (Pid ), fun (C ) -> C + 1 end , Acc );
208+ _ ->
209+ Acc
210+ end
211+ end , Counters0 , GetQueues ()).
212+
213+ % % for unit testing
214+ -spec node () -> node ().
215+ node () -> erlang :node ().
0 commit comments