1313import java .util .HashMap ;
1414import java .util .List ;
1515import java .util .Map ;
16+ import java .util .UUID ;
1617import java .util .concurrent .CountDownLatch ;
1718import java .util .concurrent .TimeUnit ;
1819
@@ -56,22 +57,18 @@ public void testBoundedXDeathHeaderGrowth() throws IOException, InterruptedExcep
5657 declareTransientFanoutExchange (x3 );
5758
5859 final String q1 = "issues.rabbitmq-server-78.queue1" ;
59- Map <String , Object > args1 = argumentsForDeadLetteringTo (x1 );
60- declareTransientQueue (q1 , args1 );
60+ declareTransientQueue (q1 , argumentsForDeadLetteringTo (x1 ));
6161
6262 final String q2 = "issues.rabbitmq-server-78.queue2" ;
63- Map <String , Object > args2 = argumentsForDeadLetteringTo (x2 );
64- declareTransientQueue (q2 , args2 );
63+ declareTransientQueue (q2 , argumentsForDeadLetteringTo (x2 ));
6564 this .channel .queueBind (q2 , x1 , "" );
6665
6766 final String q3 = "issues.rabbitmq-server-78.queue3" ;
68- Map <String , Object > args3 = argumentsForDeadLetteringTo (x3 );
69- declareTransientQueue (q3 , args3 );
67+ declareTransientQueue (q3 , argumentsForDeadLetteringTo (x3 ));
7068 this .channel .queueBind (q3 , x2 , "" );
7169
7270 final String qz = "issues.rabbitmq-server-78.destination" ;
73- Map <String , Object > args4 = argumentsForDeadLetteringTo (x3 );
74- declareTransientQueue (qz , args4 );
71+ declareTransientQueue (qz , argumentsForDeadLetteringTo (x3 ));
7572 this .channel .queueBind (qz , x3 , "" );
7673
7774 CountDownLatch latch = new CountDownLatch (10 );
@@ -95,6 +92,103 @@ public void testBoundedXDeathHeaderGrowth() throws IOException, InterruptedExcep
9592 }
9693 Collections .sort (cs );
9794 assertEquals (Arrays .asList (1L , 1L , 1L , 9L ), cs );
95+
96+ cleanUpExchanges (x1 , x2 , x3 );
97+ cleanUpQueues (q1 , q2 , q3 , qz );
98+ }
99+
100+ private void cleanUpExchanges (String ... xs ) throws IOException {
101+ for (String x : xs ) {
102+ this .channel .exchangeDelete (x );
103+ }
104+ }
105+ private void cleanUpQueues (String ... qs ) throws IOException {
106+ for (String q : qs ) {
107+ this .channel .queueDelete (q );
108+ }
109+ }
110+
111+ @ SuppressWarnings ("unchecked" )
112+ public void testHandlingOfXDeathHeadersFromEarlierVersions () throws IOException , InterruptedException {
113+ final String x1 = "issues.rabbitmq-server-152.fanout1" ;
114+ declareTransientFanoutExchange (x1 );
115+ final String x2 = "issues.rabbitmq-server-152.fanout2" ;
116+ declareTransientFanoutExchange (x2 );
117+
118+ final String q1 = "issues.rabbitmq-server-152.queue1" ;
119+ declareTransientQueue (q1 , argumentsForDeadLetteringTo (x1 ));
120+
121+ final String q2 = "issues.rabbitmq-server-152.queue2" ;
122+ declareTransientQueue (q2 , argumentsForDeadLetteringTo (x2 ));
123+ this .channel .queueBind (q2 , x1 , "" );
124+
125+ final String qz = "issues.rabbitmq-server-152.destination" ;
126+ declareTransientQueue (qz , argumentsForDeadLetteringTo (x2 ));
127+ this .channel .queueBind (qz , x2 , "" );
128+
129+ CountDownLatch latch = new CountDownLatch (10 );
130+ RejectingConsumer cons = new RejectingConsumer (this .channel , latch );
131+ this .channel .basicConsume (qz , cons );
132+
133+ final AMQP .BasicProperties .Builder bldr = new AMQP .BasicProperties .Builder ();
134+ AMQP .BasicProperties props = bldr .headers (
135+ propsWithLegacyXDeathsInHeaders ("issues.rabbitmq-server-152.queue97" ,
136+ "issues.rabbitmq-server-152.queue97" ,
137+ "issues.rabbitmq-server-152.queue97" ,
138+ "issues.rabbitmq-server-152.queue98" ,
139+ "issues.rabbitmq-server-152.queue99" )).build ();
140+ this .channel .basicPublish ("" , q1 , props , "msg" .getBytes ());
141+
142+ assertTrue (latch .await (5 , TimeUnit .SECONDS ));
143+ List <Map <String , Object >> events = (List <Map <String , Object >>)cons .getHeaders ().get ("x-death" );
144+ assertEquals (6 , events .size ());
145+
146+ List <String > qs = new ArrayList <String >();
147+ for (Map <String , Object > evt : events ) {
148+ qs .add (evt .get ("queue" ).toString ());
149+ }
150+ Collections .sort (qs );
151+ assertEquals (Arrays .asList (qz , q1 , q2 ,
152+ "issues.rabbitmq-server-152.queue97" ,
153+ "issues.rabbitmq-server-152.queue98" ,
154+ "issues.rabbitmq-server-152.queue99" ), qs );
155+ List <Long > cs = new ArrayList <Long >();
156+ for (Map <String , Object > evt : events ) {
157+ cs .add ((Long )evt .get ("count" ));
158+ }
159+ Collections .sort (cs );
160+ assertEquals (Arrays .asList (1L , 1L , 4L , 4L , 9L , 12L ), cs );
161+
162+ cleanUpExchanges (x1 , x2 );
163+ cleanUpQueues (q1 , q2 , qz ,
164+ "issues.rabbitmq-server-152.queue97" ,
165+ "issues.rabbitmq-server-152.queue98" ,
166+ "issues.rabbitmq-server-152.queue99" );
167+ }
168+
169+ private Map <String , Object > propsWithLegacyXDeathsInHeaders (String ... qs ) {
170+ Map <String , Object > m = new HashMap <String , Object >();
171+ List <Map <String , Object >> xDeaths = new ArrayList <Map <String , Object >>();
172+ for (String q : qs ) {
173+ xDeaths .add (newXDeath (q ));
174+ xDeaths .add (newXDeath (q ));
175+ xDeaths .add (newXDeath (q ));
176+ xDeaths .add (newXDeath (q ));
177+ }
178+
179+ m .put ("x-death" , xDeaths );
180+ return m ;
181+ }
182+
183+ private Map <String , Object > newXDeath (String q ) {
184+ Map <String , Object > m = new HashMap <String , Object >();
185+ m .put ("reason" , "expired" );
186+ m .put ("queue" , q );
187+ m .put ("exchange" , "issues.rabbitmq-server-152.fanout0" );
188+ m .put ("routing-keys" , Arrays .asList ("routing-key-1" , "routing-key-2" ));
189+ m .put ("random" , UUID .randomUUID ().toString ());
190+
191+ return m ;
98192 }
99193
100194 private Map <String , Object > argumentsForDeadLetteringTo (String dlx ) {
0 commit comments