@@ -94,7 +94,8 @@ groups() ->
9494 single_active_consumer_priority_take_over ,
9595 single_active_consumer_priority ,
9696 force_shrink_member_to_current_member ,
97- force_all_queues_shrink_member_to_current_member
97+ force_all_queues_shrink_member_to_current_member ,
98+ force_vhost_queues_shrink_member_to_current_member
9899 ]
99100 ++ all_tests ()},
100101 {cluster_size_5 , [], [start_queue ,
@@ -1235,6 +1236,72 @@ force_all_queues_shrink_member_to_current_member(Config) ->
12351236 ? assertEqual (3 , length (Nodes0 ))
12361237 end || Q <- QQs ].
12371238
1239+ force_vhost_queues_shrink_member_to_current_member (Config ) ->
1240+ [Server0 , Server1 , Server2 ] =
1241+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1242+
1243+ Ch0 = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1244+ QQ = ? config (queue_name , Config ),
1245+ AQ = ? config (alt_queue_name , Config ),
1246+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1247+ declare (Ch0 , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1248+ ? assertEqual ({'queue.declare_ok' , AQ , 0 , 0 },
1249+ declare (Ch0 , AQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1250+
1251+ QQs = [QQ , AQ ],
1252+
1253+ VHost1 = <<" /" >>,
1254+ VHost2 = <<" another-vhost" >>,
1255+ VHosts = [VHost1 , VHost2 ],
1256+
1257+ User = ? config (rmq_username , Config ),
1258+ ok = rabbit_ct_broker_helpers :add_vhost (Config , Server0 , VHost2 , User ),
1259+ ok = rabbit_ct_broker_helpers :set_full_permissions (Config , User , VHost2 ),
1260+ Conn1 = rabbit_ct_client_helpers :open_unmanaged_connection (Config , Server0 , VHost2 ),
1261+ {ok , Ch1 } = amqp_connection :open_channel (Conn1 ),
1262+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1263+ declare (Ch1 , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1264+ ? assertEqual ({'queue.declare_ok' , AQ , 0 , 0 },
1265+ declare (Ch1 , AQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1266+
1267+ [rabbit_ct_client_helpers :publish (Ch , Q , 3 ) || Q <- QQs , Ch <- [Ch0 , Ch1 ]],
1268+
1269+ [begin
1270+ QQRes = rabbit_misc :r (VHost , queue , Q ),
1271+ {ok , RaName } = rpc :call (Server0 , rabbit_queue_type_util , qname_to_internal_name , [QQRes ]),
1272+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1273+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , VHost ]),
1274+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1275+ ? assertEqual (3 , length (Nodes0 ))
1276+ end || Q <- QQs , VHost <- VHosts ],
1277+
1278+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1279+ force_vhost_queues_shrink_member_to_current_member , [VHost2 ]),
1280+
1281+ [begin
1282+ QQRes = rabbit_misc :r (VHost , queue , Q ),
1283+ {ok , RaName } = rpc :call (Server0 , rabbit_queue_type_util , qname_to_internal_name , [QQRes ]),
1284+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1285+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , VHost ]),
1286+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1287+ case VHost of
1288+ VHost1 -> ? assertEqual (3 , length (Nodes0 ));
1289+ VHost2 -> ? assertEqual (1 , length (Nodes0 ))
1290+ end
1291+ end || Q <- QQs , VHost <- VHosts ],
1292+
1293+ % % grow queues back to all nodes in VHost2 only
1294+ [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , VHost2 , <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
1295+
1296+ [begin
1297+ QQRes = rabbit_misc :r (VHost , queue , Q ),
1298+ {ok , RaName } = rpc :call (Server0 , rabbit_queue_type_util , qname_to_internal_name , [QQRes ]),
1299+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1300+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , VHost ]),
1301+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1302+ ? assertEqual (3 , length (Nodes0 ))
1303+ end || Q <- QQs , VHost <- VHosts ].
1304+
12381305priority_queue_fifo (Config ) ->
12391306 % % testing: if hi priority messages are published before lo priority
12401307 % % messages they are always consumed first (fifo)
0 commit comments