@@ -92,7 +92,9 @@ groups() ->
9292 format ,
9393 add_member_2 ,
9494 single_active_consumer_priority_take_over ,
95- single_active_consumer_priority
95+ single_active_consumer_priority ,
96+ force_shrink_member_to_current_member ,
97+ force_all_queues_shrink_member_to_current_member
9698 ]
9799 ++ all_tests ()},
98100 {cluster_size_5 , [], [start_queue ,
@@ -1154,6 +1156,85 @@ single_active_consumer_priority(Config) ->
11541156 rpc :call (Server0 , ra , local_query , [RaNameQ3 , QueryFun ])),
11551157 ok .
11561158
1159+ force_shrink_member_to_current_member (Config ) ->
1160+ [Server0 , Server1 , Server2 ] =
1161+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1162+
1163+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1164+ QQ = ? config (queue_name , Config ),
1165+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1166+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1167+
1168+ RaName = ra_name (QQ ),
1169+ rabbit_ct_client_helpers :publish (Ch , QQ , 3 ),
1170+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1171+
1172+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1173+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1174+ ? assertEqual (3 , length (Nodes0 )),
1175+
1176+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1177+ force_shrink_member_to_current_member , [<<" /" >>, QQ ]),
1178+
1179+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1180+
1181+ {ok , Q1 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1182+ #{nodes := Nodes1 } = amqqueue :get_type_state (Q1 ),
1183+ ? assertEqual (1 , length (Nodes1 )),
1184+
1185+ % % grow queues back to all nodes
1186+ [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , <<" /" >>, <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
1187+
1188+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1189+ {ok , Q2 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1190+ #{nodes := Nodes2 } = amqqueue :get_type_state (Q2 ),
1191+ ? assertEqual (3 , length (Nodes2 )).
1192+
1193+ force_all_queues_shrink_member_to_current_member (Config ) ->
1194+ [Server0 , Server1 , Server2 ] =
1195+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1196+
1197+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1198+ QQ = ? config (queue_name , Config ),
1199+ AQ = ? config (alt_queue_name , Config ),
1200+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1201+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1202+ ? assertEqual ({'queue.declare_ok' , AQ , 0 , 0 },
1203+ declare (Ch , AQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1204+
1205+ QQs = [QQ , AQ ],
1206+
1207+ [begin
1208+ RaName = ra_name (Q ),
1209+ rabbit_ct_client_helpers :publish (Ch , Q , 3 ),
1210+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1211+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1212+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1213+ ? assertEqual (3 , length (Nodes0 ))
1214+ end || Q <- QQs ],
1215+
1216+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1217+ force_all_queues_shrink_member_to_current_member , []),
1218+
1219+ [begin
1220+ RaName = ra_name (Q ),
1221+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1222+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1223+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1224+ ? assertEqual (1 , length (Nodes0 ))
1225+ end || Q <- QQs ],
1226+
1227+ % % grow queues back to all nodes
1228+ [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , <<" /" >>, <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
1229+
1230+ [begin
1231+ RaName = ra_name (Q ),
1232+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1233+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1234+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1235+ ? assertEqual (3 , length (Nodes0 ))
1236+ end || Q <- QQs ].
1237+
11571238priority_queue_fifo (Config ) ->
11581239 % % testing: if hi priority messages are published before lo priority
11591240 % % messages they are always consumed first (fifo)
0 commit comments