@@ -92,7 +92,10 @@ groups() ->
9292 format ,
9393 add_member_2 ,
9494 single_active_consumer_priority_take_over ,
95- single_active_consumer_priority
95+ single_active_consumer_priority ,
96+ force_shrink_member_to_current_member ,
97+ force_all_queues_shrink_member_to_current_member ,
98+ force_vhost_queues_shrink_member_to_current_member
9699 ]
97100 ++ all_tests ()},
98101 {cluster_size_5 , [], [start_queue ,
@@ -1154,6 +1157,151 @@ single_active_consumer_priority(Config) ->
11541157 rpc :call (Server0 , ra , local_query , [RaNameQ3 , QueryFun ])),
11551158 ok .
11561159
1160+ force_shrink_member_to_current_member (Config ) ->
1161+ [Server0 , Server1 , Server2 ] =
1162+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1163+
1164+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1165+ QQ = ? config (queue_name , Config ),
1166+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1167+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1168+
1169+ RaName = ra_name (QQ ),
1170+ rabbit_ct_client_helpers :publish (Ch , QQ , 3 ),
1171+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1172+
1173+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1174+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1175+ ? assertEqual (3 , length (Nodes0 )),
1176+
1177+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1178+ force_shrink_member_to_current_member , [<<" /" >>, QQ ]),
1179+
1180+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1181+
1182+ {ok , Q1 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1183+ #{nodes := Nodes1 } = amqqueue :get_type_state (Q1 ),
1184+ ? assertEqual (1 , length (Nodes1 )),
1185+
1186+ % % grow queues back to all nodes
1187+ [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , <<" /" >>, <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
1188+
1189+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1190+ {ok , Q2 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1191+ #{nodes := Nodes2 } = amqqueue :get_type_state (Q2 ),
1192+ ? assertEqual (3 , length (Nodes2 )).
1193+
1194+ force_all_queues_shrink_member_to_current_member (Config ) ->
1195+ [Server0 , Server1 , Server2 ] =
1196+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1197+
1198+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1199+ QQ = ? config (queue_name , Config ),
1200+ AQ = ? config (alt_queue_name , Config ),
1201+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1202+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1203+ ? assertEqual ({'queue.declare_ok' , AQ , 0 , 0 },
1204+ declare (Ch , AQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1205+
1206+ QQs = [QQ , AQ ],
1207+
1208+ [begin
1209+ RaName = ra_name (Q ),
1210+ rabbit_ct_client_helpers :publish (Ch , Q , 3 ),
1211+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1212+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1213+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1214+ ? assertEqual (3 , length (Nodes0 ))
1215+ end || Q <- QQs ],
1216+
1217+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1218+ force_all_queues_shrink_member_to_current_member , []),
1219+
1220+ [begin
1221+ RaName = ra_name (Q ),
1222+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1223+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1224+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1225+ ? assertEqual (1 , length (Nodes0 ))
1226+ end || Q <- QQs ],
1227+
1228+ % % grow queues back to all nodes
1229+ [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , <<" /" >>, <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
1230+
1231+ [begin
1232+ RaName = ra_name (Q ),
1233+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1234+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1235+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1236+ ? assertEqual (3 , length (Nodes0 ))
1237+ end || Q <- QQs ].
1238+
1239+ force_vhost_queues_shrink_member_to_current_member (Config ) ->
1240+ [Server0 , Server1 , Server2 ] =
1241+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1242+
1243+ Ch0 = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1244+ QQ = ? config (queue_name , Config ),
1245+ AQ = ? config (alt_queue_name , Config ),
1246+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1247+ declare (Ch0 , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1248+ ? assertEqual ({'queue.declare_ok' , AQ , 0 , 0 },
1249+ declare (Ch0 , AQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1250+
1251+ QQs = [QQ , AQ ],
1252+
1253+ VHost1 = <<" /" >>,
1254+ VHost2 = <<" another-vhost" >>,
1255+ VHosts = [VHost1 , VHost2 ],
1256+
1257+ User = ? config (rmq_username , Config ),
1258+ ok = rabbit_ct_broker_helpers :add_vhost (Config , Server0 , VHost2 , User ),
1259+ ok = rabbit_ct_broker_helpers :set_full_permissions (Config , User , VHost2 ),
1260+ Conn1 = rabbit_ct_client_helpers :open_unmanaged_connection (Config , Server0 , VHost2 ),
1261+ {ok , Ch1 } = amqp_connection :open_channel (Conn1 ),
1262+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1263+ declare (Ch1 , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1264+ ? assertEqual ({'queue.declare_ok' , AQ , 0 , 0 },
1265+ declare (Ch1 , AQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1266+
1267+ [rabbit_ct_client_helpers :publish (Ch , Q , 3 ) || Q <- QQs , Ch <- [Ch0 , Ch1 ]],
1268+
1269+ [begin
1270+ QQRes = rabbit_misc :r (VHost , queue , Q ),
1271+ {ok , RaName } = rpc :call (Server0 , rabbit_queue_type_util , qname_to_internal_name , [QQRes ]),
1272+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1273+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , VHost ]),
1274+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1275+ ? assertEqual (3 , length (Nodes0 ))
1276+ end || Q <- QQs , VHost <- VHosts ],
1277+
1278+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1279+ force_vhost_queues_shrink_member_to_current_member , [VHost2 ]),
1280+
1281+ [begin
1282+ QQRes = rabbit_misc :r (VHost , queue , Q ),
1283+ {ok , RaName } = rpc :call (Server0 , rabbit_queue_type_util , qname_to_internal_name , [QQRes ]),
1284+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1285+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , VHost ]),
1286+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1287+ case VHost of
1288+ VHost1 -> ? assertEqual (3 , length (Nodes0 ));
1289+ VHost2 -> ? assertEqual (1 , length (Nodes0 ))
1290+ end
1291+ end || Q <- QQs , VHost <- VHosts ],
1292+
1293+ % % grow queues back to all nodes in VHost2 only
1294+ [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , VHost2 , <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
1295+
1296+ [begin
1297+ QQRes = rabbit_misc :r (VHost , queue , Q ),
1298+ {ok , RaName } = rpc :call (Server0 , rabbit_queue_type_util , qname_to_internal_name , [QQRes ]),
1299+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1300+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , VHost ]),
1301+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1302+ ? assertEqual (3 , length (Nodes0 ))
1303+ end || Q <- QQs , VHost <- VHosts ].
1304+
11571305priority_queue_fifo (Config ) ->
11581306 % % testing: if hi priority messages are published before lo priority
11591307 % % messages they are always consumed first (fifo)
0 commit comments