Skip to content

Commit 4d42a9c

Browse files
committed
GH-2550: Fix CachingCF leak after reconnection
Fixes: #2550 When a `CachingConnectionFactory` is used in a `SimpleMessageContainer` and then there is a connection reset (network glitch, Rabbit restart etc.,), the `SimpleMessageContainer` tries to restart the consumer. However` the checkout permits assigned to the lost channels in `CachingConnectionFactory` is not reclaimed. So the consumer is unable to create channels and recover. * Fix `BlockingQueueConsumer.forceCloseAndClearQueue()` to not check `channel.isOpen()` and always perform respective cleanups, including releasing permits for channels in the `CachingConnectionFactory`. If channel is closed for the network reset reason, it is going to be recreated in the cache. * Add `ConsumerConnectionRecoveryTests` to verify that consumer is really consuming after reconnection. Use Testcontainers to restart `RabbitMQContainer` in the middle of the test **Cherry-pick to `3.0.x`** (cherry picked from commit 0c4efb6)
1 parent 8ca72ed commit 4d42a9c

File tree

2 files changed

+128
-1
lines changed

2 files changed

+128
-1
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -788,7 +788,7 @@ public synchronized void stop() {
788788
}
789789

790790
public void forceCloseAndClearQueue() {
791-
if (this.channel != null && this.channel.isOpen()) {
791+
if (this.channel != null) {
792792
RabbitUtils.setPhysicalCloseRequired(this.channel, true);
793793
ConnectionFactoryUtils.releaseResources(this.resourceHolder);
794794
this.deliveryTags.clear();
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.connection;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.concurrent.BlockingQueue;
22+
import java.util.concurrent.LinkedBlockingQueue;
23+
import java.util.concurrent.TimeUnit;
24+
25+
import org.junit.jupiter.api.Test;
26+
import org.testcontainers.containers.RabbitMQContainer;
27+
import org.testcontainers.junit.jupiter.Container;
28+
import org.testcontainers.junit.jupiter.Testcontainers;
29+
import org.testcontainers.utility.DockerImageName;
30+
31+
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
32+
import org.springframework.amqp.rabbit.annotation.Queue;
33+
import org.springframework.amqp.rabbit.annotation.RabbitListener;
34+
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
35+
import org.springframework.amqp.rabbit.core.RabbitAdmin;
36+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
37+
import org.springframework.beans.factory.annotation.Autowired;
38+
import org.springframework.context.annotation.Bean;
39+
import org.springframework.context.annotation.Configuration;
40+
import org.springframework.test.annotation.DirtiesContext;
41+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
42+
43+
44+
/**
45+
* @author Artem Bilan
46+
*
47+
* @since 3.0.11
48+
*
49+
*/
50+
@SpringJUnitConfig
51+
@Testcontainers(disabledWithoutDocker = true)
52+
@DirtiesContext
53+
public class ConsumerConnectionRecoveryTests {
54+
55+
@Container
56+
static final RabbitMQContainer RABBIT_MQ_CONTAINER =
57+
new RabbitMQContainer(DockerImageName.parse("rabbitmq"));
58+
59+
@Test
60+
void verifyThatChannelPermitsAreReleaseOnReconnect(@Autowired TestConfiguration application)
61+
throws InterruptedException {
62+
63+
application.rabbitTemplate().convertAndSend("testQueue", "test data #1");
64+
65+
assertThat(application.received.poll(20, TimeUnit.SECONDS)).isEqualTo("test data #1");
66+
67+
RABBIT_MQ_CONTAINER.stop();
68+
RABBIT_MQ_CONTAINER.start();
69+
70+
application.connectionFactory().setPort(RABBIT_MQ_CONTAINER.getAmqpPort());
71+
application.publisherConnectionFactory().setPort(RABBIT_MQ_CONTAINER.getAmqpPort());
72+
73+
application.rabbitTemplate().convertAndSend("testQueue", "test data #2");
74+
75+
assertThat(application.received.poll(30, TimeUnit.SECONDS)).isEqualTo("test data #2");
76+
}
77+
78+
@Configuration
79+
@EnableRabbit
80+
public static class TestConfiguration {
81+
82+
@Bean
83+
CachingConnectionFactory connectionFactory() {
84+
CachingConnectionFactory connectionFactory =
85+
new CachingConnectionFactory("localhost", RABBIT_MQ_CONTAINER.getAmqpPort());
86+
connectionFactory.setChannelCacheSize(1);
87+
connectionFactory.setChannelCheckoutTimeout(2000);
88+
return connectionFactory;
89+
}
90+
91+
@Bean
92+
CachingConnectionFactory publisherConnectionFactory() {
93+
CachingConnectionFactory connectionFactory =
94+
new CachingConnectionFactory("localhost", RABBIT_MQ_CONTAINER.getAmqpPort());
95+
connectionFactory.setChannelCacheSize(1);
96+
connectionFactory.setChannelCheckoutTimeout(2000);
97+
return connectionFactory;
98+
}
99+
100+
@Bean
101+
RabbitTemplate rabbitTemplate() {
102+
return new RabbitTemplate(publisherConnectionFactory());
103+
}
104+
105+
@Bean
106+
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
107+
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
108+
factory.setConnectionFactory(connectionFactory());
109+
return factory;
110+
}
111+
112+
@Bean
113+
RabbitAdmin rabbitAdmin() {
114+
return new RabbitAdmin(publisherConnectionFactory());
115+
}
116+
117+
BlockingQueue<String> received = new LinkedBlockingQueue<>();
118+
119+
@RabbitListener(queuesToDeclare = @Queue("testQueue"))
120+
void consume(String payload) {
121+
this.received.add(payload);
122+
}
123+
124+
}
125+
126+
}
127+

0 commit comments

Comments
 (0)