@@ -51,6 +51,9 @@ groups() ->
51
51
roundtrip_with_drain_classic_queue ,
52
52
roundtrip_with_drain_quorum_queue ,
53
53
roundtrip_with_drain_stream ,
54
+ drain_many_classic_queue ,
55
+ drain_many_quorum_queue ,
56
+ drain_many_stream ,
54
57
amqp_stream_amqpl ,
55
58
amqp_quorum_queue_amqpl ,
56
59
message_headers_conversion ,
@@ -171,6 +174,7 @@ end_per_group(_, Config) ->
171
174
init_per_testcase (T , Config )
172
175
when T =:= message_headers_conversion orelse
173
176
T =:= roundtrip_with_drain_quorum_queue orelse
177
+ T =:= drain_many_quorum_queue orelse
174
178
T =:= timed_get_quorum_queue orelse
175
179
T =:= available_messages_quorum_queue ->
176
180
case rpc (Config , rabbit_feature_flags , is_enabled , [credit_api_v2 ]) of
@@ -646,6 +650,69 @@ roundtrip_with_drain(Config, QueueType, QName)
646
650
ok = rabbitmq_amqp_client :detach_management_link_pair_sync (LinkPair ),
647
651
ok = amqp10_client :close_connection (Connection ).
648
652
653
+ drain_many_classic_queue (Config ) ->
654
+ QName = atom_to_binary (? FUNCTION_NAME ),
655
+ drain_many (Config , <<" classic" >>, QName ).
656
+
657
+ drain_many_quorum_queue (Config ) ->
658
+ QName = atom_to_binary (? FUNCTION_NAME ),
659
+ drain_many (Config , <<" quorum" >>, QName ).
660
+
661
+ drain_many_stream (Config ) ->
662
+ QName = atom_to_binary (? FUNCTION_NAME ),
663
+ drain_many (Config , <<" stream" >>, QName ).
664
+
665
+ drain_many (Config , QueueType , QName )
666
+ when is_binary (QueueType ) ->
667
+ Address = <<" /queue/" , QName /binary >>,
668
+ {Connection , Session , LinkPair } = init (Config ),
669
+ QProps = #{arguments => #{<<" x-queue-type" >> => {utf8 , QueueType }}},
670
+ {ok , #{type := QueueType }} = rabbitmq_amqp_client :declare_queue (LinkPair , QName , QProps ),
671
+ {ok , Sender } = amqp10_client :attach_sender_link (Session , <<" test-sender" >>, Address ),
672
+ wait_for_credit (Sender ),
673
+
674
+ Num = 5000 ,
675
+ ok = send_messages (Sender , Num , false ),
676
+ ok = wait_for_accepts (Num ),
677
+
678
+ TerminusDurability = none ,
679
+ Filter = consume_from_first (QueueType ),
680
+ {ok , Receiver } = amqp10_client :attach_receiver_link (
681
+ Session , <<" test-receiver" >>, Address , settled ,
682
+ TerminusDurability , Filter ),
683
+
684
+ ok = amqp10_client :flow_link_credit (Receiver , Num - 1 , never , true ),
685
+ ? assertEqual (Num - 1 , count_received_messages (Receiver )),
686
+ flush (" drained 1" ),
687
+
688
+ ok = amqp10_client :flow_link_credit (Receiver , Num , never , true ),
689
+ ? assertEqual (1 , count_received_messages (Receiver )),
690
+ flush (" drained 2" ),
691
+
692
+ ok = send_messages (Sender , Num , false ),
693
+ ok = wait_for_accepts (Num ),
694
+ receive Unexpected -> ct :fail ({unexpected , Unexpected })
695
+ after 10 -> ok
696
+ end ,
697
+ % % Let's send 2 FLOW frames in sequence.
698
+ ok = amqp10_client :flow_link_credit (Receiver , Num , never , true ),
699
+ ok = amqp10_client :flow_link_credit (Receiver , Num , never , true ),
700
+ ? assertEqual (Num , count_received_messages (Receiver )),
701
+ flush (" drained 3" ),
702
+
703
+ ok = send_messages (Sender , 1 , false ),
704
+ ok = wait_for_accepts (1 ),
705
+ % % Our receiver shouldn't have any credit left to consume the last message.
706
+ receive {amqp10_msg , _ , _ } -> ct :fail (unexpected_delivery )
707
+ after 30 -> ok
708
+ end ,
709
+
710
+ ok = amqp10_client :detach_link (Sender ),
711
+ ok = amqp10_client :detach_link (Receiver ),
712
+ {ok , _ } = rabbitmq_amqp_client :delete_queue (LinkPair , QName ),
713
+ ok = rabbitmq_amqp_client :detach_management_link_pair_sync (LinkPair ),
714
+ ok = amqp10_client :close_connection (Connection ).
715
+
649
716
amqp_stream_amqpl (Config ) ->
650
717
amqp_amqpl (<<" stream" >>, Config ).
651
718
0 commit comments