1313-include_lib (" amqp_client/include/amqp_client.hrl" ).
1414
1515-import (rabbit_ct_broker_helpers ,
16- [rpc /4 ]).
16+ [rpc /4 , rpc / 5 ]).
1717
1818all () ->
1919 [
20- {group , tests }
20+ {group , cluster_size_1 },
21+ {group , cluster_size_3 }
2122 ].
2223
2324groups () ->
2425 [
25- {tests , [shuffle ],
26+ {cluster_size_1 , [],
2627 [message_size ,
27- over_max_message_size ]}
28+ over_max_message_size ]},
29+ {cluster_size_3 , [],
30+ [summary ]
31+ }
2832 ].
2933
3034% % -------------------------------------------------------------------
@@ -34,16 +38,31 @@ groups() ->
3438init_per_suite (Config ) ->
3539 {ok , _ } = application :ensure_all_started (amqp10_client ),
3640 rabbit_ct_helpers :log_environment (),
37- rabbit_ct_helpers : run_setup_steps ( Config ) .
41+ Config .
3842
3943end_per_suite (Config ) ->
4044 rabbit_ct_helpers :run_teardown_steps (Config ).
4145
42- init_per_group (_Group , Config ) ->
43- rabbit_ct_helpers :run_steps (
44- Config ,
45- rabbit_ct_broker_helpers :setup_steps () ++
46- rabbit_ct_client_helpers :setup_steps ()).
46+ init_per_group (Group , Config ) ->
47+ Nodes = case Group of
48+ cluster_size_1 -> 1 ;
49+ cluster_size_3 -> 3
50+ end ,
51+ Suffix = rabbit_ct_helpers :testcase_absname (Config , " " , " -" ),
52+ Config1 = rabbit_ct_helpers :set_config (
53+ Config , [{rmq_nodes_count , Nodes },
54+ {rmq_nodename_suffix , Suffix }]),
55+
56+ Config2 = rabbit_ct_helpers :run_setup_steps (
57+ Config1 ,
58+ rabbit_ct_broker_helpers :setup_steps () ++
59+ rabbit_ct_client_helpers :setup_steps ()),
60+ case rabbit_ct_broker_helpers :enable_feature_flag (Config2 , 'rabbitmq_4.2.0' ) of
61+ ok ->
62+ Config2 ;
63+ {skip , _ } = Skip ->
64+ Skip
65+ end .
4766
4867end_per_group (_Group , Config ) ->
4968 rabbit_ct_helpers :run_steps (
@@ -65,32 +84,7 @@ message_size(Config) ->
6584 AmqplBefore = get_msg_size_metrics (amqp091 , Config ),
6685 AmqpBefore = get_msg_size_metrics (amqp10 , Config ),
6786
68- Binary2B = <<" 12" >>,
69- Binary200K = binary :copy (<<" x" >>, 200_000 ),
70- Payloads = [Binary2B , Binary200K , Binary2B ],
71-
72- {AmqplConn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config ),
73- [amqp_channel :call (Ch ,
74- # 'basic.publish' {routing_key = <<" nowhere" >>},
75- # amqp_msg {payload = Payload })
76- || Payload <- Payloads ],
77-
78- OpnConf = connection_config (Config ),
79- {ok , Connection } = amqp10_client :open_connection (OpnConf ),
80- {ok , Session } = amqp10_client :begin_session_sync (Connection ),
81- Address = rabbitmq_amqp_address :exchange (<<" amq.fanout" >>),
82- {ok , Sender } = amqp10_client :attach_sender_link_sync (Session , <<" sender" >>, Address ),
83- receive {amqp10_event , {link , Sender , credited }} -> ok
84- after 30_000 -> ct :fail (credited_timeout )
85- end ,
86-
87- ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" tag1" >>, Binary2B )),
88- ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" tag2" >>, Binary200K )),
89- ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" tag3" >>, Binary2B )),
90-
91- ok = wait_for_settlement (released , <<" tag1" >>),
92- ok = wait_for_settlement (released , <<" tag2" >>),
93- ok = wait_for_settlement (released , <<" tag3" >>),
87+ publish_messages (Config ),
9488
9589 AmqplAfter = get_msg_size_metrics (amqp091 , Config ),
9690 AmqpAfter = get_msg_size_metrics (amqp10 , Config ),
@@ -100,10 +94,7 @@ message_size(Config) ->
10094 ? assertEqual (ExpectedDiff ,
10195 rabbit_msg_size_metrics :diff_raw_buckets (AmqplAfter , AmqplBefore )),
10296 ? assertEqual (ExpectedDiff ,
103- rabbit_msg_size_metrics :diff_raw_buckets (AmqpAfter , AmqpBefore )),
104-
105- ok = amqp10_client :close_connection (Connection ),
106- ok = rabbit_ct_client_helpers :close_connection_and_channel (AmqplConn , Ch ).
97+ rabbit_msg_size_metrics :diff_raw_buckets (AmqpAfter , AmqpBefore )).
10798
10899over_max_message_size (Config ) ->
109100 DefaultMaxMessageSize = rpc (Config , persistent_term , get , [max_message_size ]),
@@ -134,6 +125,39 @@ over_max_message_size(Config) ->
134125 ok = rabbit_ct_client_helpers :close_connection (Conn ),
135126 ok = rpc (Config , persistent_term , put , [max_message_size , DefaultMaxMessageSize ]).
136127
128+ summary (Config ) ->
129+ ZeroSummary = [{{0 , 100 }, {0 , 0.0 }},
130+ {{101 , 1000 }, {0 , 0.0 }},
131+ {{1001 , 10000 }, {0 , 0.0 }},
132+ {{10001 , 100000 }, {0 , 0.0 }},
133+ {{100001 , 1000000 }, {0 , 0.0 }},
134+ {{1000001 , 10000000 }, {0 , 0.0 }},
135+ {{10000001 , 50000000 }, {0 , 0.0 }},
136+ {{50000001 , 100000000 }, {0 , 0.0 }},
137+ {{100000001 , infinity }, {0 , 0.0 }}],
138+
139+ ? assertEqual (ZeroSummary , rpc (Config , 0 , rabbit_msg_size_metrics , local_summary , [])),
140+ ? assertEqual (ZeroSummary , rpc (Config , 1 , rabbit_msg_size_metrics , cluster_summary , [])),
141+ ? assertEqual (ZeroSummary , rpc (Config , 0 , rabbit_msg_size_metrics , local_summary , [])),
142+ ? assertEqual (ZeroSummary , rpc (Config , 1 , rabbit_msg_size_metrics , cluster_summary , [])),
143+
144+ publish_messages (Config ),
145+
146+ ExpectedSummary = [{{0 , 100 }, {4 , 66.66666666666666 }},
147+ {{101 , 1000 }, {0 , 0.0 }},
148+ {{1001 , 10000 }, {0 , 0.0 }},
149+ {{10001 , 100000 }, {0 , 0.0 }},
150+ {{100001 , 1000000 }, {2 , 33.33333333333333 }},
151+ {{1000001 , 10000000 }, {0 , 0.0 }},
152+ {{10000001 , 50000000 }, {0 , 0.0 }},
153+ {{50000001 , 100000000 }, {0 , 0.0 }},
154+ {{100000001 , infinity }, {0 , 0.0 }}],
155+
156+ ? assertEqual (ExpectedSummary , rpc (Config , 0 , rabbit_msg_size_metrics , local_summary , [])),
157+ ? assertEqual (ExpectedSummary , rpc (Config , 0 , rabbit_msg_size_metrics , cluster_summary , [])),
158+ ? assertEqual (ExpectedSummary , rpc (Config , 1 , rabbit_msg_size_metrics , cluster_summary , [])),
159+ ? assertEqual (ZeroSummary , rpc (Config , 1 , rabbit_msg_size_metrics , local_summary , [])).
160+
137161get_msg_size_metrics (Protocol , Config ) ->
138162 rpc (Config , rabbit_msg_size_metrics , raw_buckets , [Protocol ]).
139163
@@ -145,6 +169,36 @@ connection_config(Config) ->
145169 container_id => <<" my container" >>,
146170 sasl => anon }.
147171
172+ publish_messages (Config ) ->
173+ Binary2B = <<" 12" >>,
174+ Binary200K = binary :copy (<<" x" >>, 200_000 ),
175+ Payloads = [Binary2B , Binary200K , Binary2B ],
176+
177+ {AmqplConn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config ),
178+ [amqp_channel :call (Ch ,
179+ # 'basic.publish' {routing_key = <<" nowhere" >>},
180+ # amqp_msg {payload = Payload })
181+ || Payload <- Payloads ],
182+
183+ OpnConf = connection_config (Config ),
184+ {ok , Connection } = amqp10_client :open_connection (OpnConf ),
185+ {ok , Session } = amqp10_client :begin_session_sync (Connection ),
186+ Address = rabbitmq_amqp_address :exchange (<<" amq.fanout" >>),
187+ {ok , Sender } = amqp10_client :attach_sender_link_sync (Session , <<" sender" >>, Address ),
188+ receive {amqp10_event , {link , Sender , credited }} -> ok
189+ after 30_000 -> ct :fail (credited_timeout )
190+ end ,
191+
192+ ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" tag1" >>, Binary2B )),
193+ ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" tag2" >>, Binary200K )),
194+ ok = amqp10_client :send_msg (Sender , amqp10_msg :new (<<" tag3" >>, Binary2B )),
195+
196+ ok = wait_for_settlement (released , <<" tag1" >>),
197+ ok = wait_for_settlement (released , <<" tag2" >>),
198+ ok = wait_for_settlement (released , <<" tag3" >>),
199+ ok = amqp10_client :close_connection (Connection ),
200+ ok = rabbit_ct_client_helpers :close_connection_and_channel (AmqplConn , Ch ).
201+
148202wait_for_settlement (State , Tag ) ->
149203 receive
150204 {amqp10_disposition , {State , Tag }} ->
0 commit comments