Skip to content

Commit 8a520b3

Browse files
artembilangaryrussell
authored andcommitted
AMQP-621: Fix Consumer.basicCancel Race Condition
JIRA: https://jira.spring.io/browse/AMQP-621 The `SimpleMessageListenerContainer` has logic to initiate `BlockingQueueConsumer.basicCancel()` and mark `BlockingQueueConsumer` as inactive immediately. But the `handleCancelOk` answer can come back from the Broker a bit late, in async manner. During this time window we can still receive messages via `handleDelivery`. With `prefetchCount = 1` we may end up with the deadlock on `handleDelivery` thread during that time window, when main consumer loop identifies consumer as inactive and exits for `stop` process and can't do that because the `BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body))` operation blocks the consumer from any other operations. * Fix the race condition via additional `BlockingQueueConsumer.cancelled()` state to let main loop to spin until `handleCancelOk` honoring any in-flight messages * The `cancelled` state is based on the `cancelReceived` flag which is changed from the `handleCancelOk` as a Broker response for the `basicCancel` and in the `handleCancel` event from the Broker as a reason of some unexpected state on Broker, e.g. consuming queue has been removed * Protect message lost via `basicReject` in case of late arrival into `handleDelivery`, when the consumer has been cancelled already Address PR comments * Rename `cancelReceived` -> `cancelled` with JavaDocs * Initiate `basicCancel()` from `handleCancel(String consumerTag)` to cancel `Consumer` from all other tags. The Broker canceling initiative is fully similar to `queueChanged` in the listener container * Do no mark `cancelled` in the `handleCancelOk` until all the tags are cancelled Polishing 1. Fix case when basicCancel received with only one queue (set cancelled). 2. Add protection to cancelled() in case all cancelOks are not received within shutdownTimeout after aborting. 3. If we are aborting, use offer instead of put to insert into the queue to avoid indefinitely blocking the client thread. 4. Purge the queue at the end of stop() just in case #2 happens and we still have a hung client thread. Add Test - Abort with Multiple Queues
1 parent 0e042b0 commit 8a520b3

File tree

4 files changed

+89
-21
lines changed

4 files changed

+89
-21
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,12 @@ public class BlockingQueueConsumer {
9797

9898
private InternalConsumer consumer;
9999

100+
/**
101+
* The flag indicating that consumer has been cancelled from all queues
102+
* via {@code handleCancelOk} callback replies.
103+
*/
100104
private final AtomicBoolean cancelled = new AtomicBoolean(false);
101105

102-
private final AtomicBoolean cancelReceived = new AtomicBoolean(false);
103-
104106
private final AcknowledgeMode acknowledgeMode;
105107

106108
private final ConnectionFactory connectionFactory;
@@ -133,6 +135,10 @@ public class BlockingQueueConsumer {
133135

134136
private BackOffExecution backOffExecution;
135137

138+
private long shutdownTimeout;
139+
140+
private volatile long abortStarted;
141+
136142
/**
137143
* Create a consumer. The consumer must not attempt to use
138144
* the connection factory or communicate with the broker
@@ -249,6 +255,10 @@ public String getConsumerTag() {
249255
public final void setQuiesce(long shutdownTimeout) {
250256
}
251257

258+
public void setShutdownTimeout(long shutdownTimeout) {
259+
this.shutdownTimeout = shutdownTimeout;
260+
}
261+
252262
/**
253263
* Set the number of retries after passive queue declaration fails.
254264
* @param declarationRetries The number of retries, default 3.
@@ -318,14 +328,18 @@ protected void basicCancel() {
318328
break;
319329
}
320330
}
321-
this.consumerTags.clear();
322-
this.cancelled.set(true);
331+
this.abortStarted = System.currentTimeMillis();
323332
}
324333

325334
protected boolean hasDelivery() {
326335
return !this.queue.isEmpty();
327336
}
328337

338+
protected boolean cancelled() {
339+
return this.cancelled.get() || (this.abortStarted > 0 &&
340+
this.abortStarted + this.shutdownTimeout > System.currentTimeMillis());
341+
}
342+
329343
/**
330344
* Check if we are in shutdown mode and if so throw an exception.
331345
*/
@@ -339,7 +353,6 @@ private void checkShutdown() {
339353
* If this is a non-POISON non-null delivery simply return it.
340354
* If this is POISON we are in shutdown mode, throw
341355
* shutdown. If delivery is null, we may be in shutdown mode. Check and see.
342-
* @throws InterruptedException
343356
*/
344357
private Message handle(Delivery delivery) throws InterruptedException {
345358
if ((delivery == null && this.shutdown != null)) {
@@ -391,7 +404,7 @@ public Message nextMessage(long timeout) throws InterruptedException, ShutdownSi
391404
checkMissingQueues();
392405
}
393406
Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
394-
if (message == null && this.cancelReceived.get()) {
407+
if (message == null && this.cancelled.get()) {
395408
throw new ConsumerCancelledException();
396409
}
397410
return message;
@@ -582,9 +595,8 @@ private void attemptPassiveDeclarations() {
582595
}
583596

584597
public void stop() {
585-
this.cancelled.set(true);
586598
if (this.consumer != null && this.consumer.getChannel() != null && this.consumerTags.size() > 0
587-
&& !this.cancelReceived.get()) {
599+
&& !this.cancelled.get()) {
588600
try {
589601
RabbitUtils.closeMessageConsumer(this.consumer.getChannel(), this.consumerTags.keySet(),
590602
this.transactional);
@@ -602,6 +614,7 @@ public void stop() {
602614
ConnectionFactoryUtils.releaseResources(this.resourceHolder);
603615
this.deliveryTags.clear();
604616
this.consumer = null;
617+
this.queue.clear(); // in case we still have a client thread blocked
605618
}
606619

607620
/**
@@ -738,19 +751,29 @@ public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig
738751
@Override
739752
public void handleCancel(String consumerTag) throws IOException {
740753
if (logger.isWarnEnabled()) {
741-
logger.warn("Cancel received for " + consumerTag + "; " + BlockingQueueConsumer.this);
754+
logger.warn("Cancel received for " + consumerTag + " ("
755+
+ BlockingQueueConsumer.this.consumerTags.get(consumerTag)
756+
+ "); " + BlockingQueueConsumer.this);
742757
}
743758
BlockingQueueConsumer.this.consumerTags.remove(consumerTag);
744-
BlockingQueueConsumer.this.cancelReceived.set(true);
759+
if (BlockingQueueConsumer.this.consumerTags.isEmpty()) {
760+
BlockingQueueConsumer.this.cancelled.set(true);
761+
}
762+
else {
763+
basicCancel();
764+
}
745765
}
746766

747767
@Override
748768
public void handleCancelOk(String consumerTag) {
749769
if (logger.isDebugEnabled()) {
750-
logger.debug("Received cancellation notice for tag " + consumerTag + "; " + BlockingQueueConsumer.this);
770+
logger.debug("Received cancelOk for tag " + consumerTag + " ("
771+
+ BlockingQueueConsumer.this.consumerTags.get(consumerTag)
772+
+ "); " + BlockingQueueConsumer.this);
751773
}
752-
synchronized (BlockingQueueConsumer.this.consumerTags) {
753-
BlockingQueueConsumer.this.consumerTags.remove(consumerTag);
774+
BlockingQueueConsumer.this.consumerTags.remove(consumerTag);
775+
if (BlockingQueueConsumer.this.consumerTags.isEmpty()) {
776+
BlockingQueueConsumer.this.cancelled.set(true);
754777
}
755778
}
756779

@@ -761,7 +784,13 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
761784
logger.debug("Storing delivery for " + BlockingQueueConsumer.this);
762785
}
763786
try {
764-
BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body));
787+
if (BlockingQueueConsumer.this.abortStarted > 0) {
788+
BlockingQueueConsumer.this.queue.offer(new Delivery(consumerTag, envelope, properties, body),
789+
BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS);
790+
}
791+
else {
792+
BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body));
793+
}
765794
}
766795
catch (InterruptedException e) {
767796
Thread.currentThread().interrupt();

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1040,6 +1040,7 @@ protected BlockingQueueConsumer createBlockingQueueConsumer() {
10401040
consumer.setTagStrategy(this.consumerTagStrategy);
10411041
}
10421042
consumer.setBackOffExecution(this.recoveryBackOff.start());
1043+
consumer.setShutdownTimeout(this.shutdownTimeout);
10431044
return consumer;
10441045
}
10451046

@@ -1306,7 +1307,7 @@ public void run() {
13061307
ConsumerChannelRegistry.registerConsumerChannel(this.consumer.getChannel(), getConnectionFactory());
13071308
}
13081309

1309-
while (isActive(this.consumer) || this.consumer.hasDelivery()) {
1310+
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
13101311
try {
13111312
boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
13121313
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ContainerInitializationTests.java

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,12 @@
4646
*/
4747
public class ContainerInitializationTests {
4848

49+
private static final String TEST_MISMATCH = "test.mismatch";
50+
51+
private static final String TEST_MISMATCH2 = "test.mismatch2";
52+
4953
@Rule
50-
public BrokerRunning brokerRunning = BrokerRunning.isRunningWithEmptyQueues("test.mismatch");
54+
public BrokerRunning brokerRunning = BrokerRunning.isRunningWithEmptyQueues(TEST_MISMATCH, TEST_MISMATCH2);
5155

5256
@After
5357
public void tearDown() {
@@ -84,8 +88,23 @@ public void testMismatchedQueue() {
8488
public void testMismatchedQueueDuringRestart() throws Exception {
8589
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config2.class);
8690
RabbitAdmin admin = context.getBean(RabbitAdmin.class);
87-
admin.deleteQueue("test.mismatch");
88-
admin.declareQueue(new Queue("test.mismatch", false, false, true));
91+
admin.deleteQueue(TEST_MISMATCH);
92+
admin.declareQueue(new Queue(TEST_MISMATCH, false, false, true));
93+
SimpleMessageListenerContainer container = context.getBean(SimpleMessageListenerContainer.class);
94+
int n = 0;
95+
while (n++ < 100 && container.isRunning()) {
96+
Thread.sleep(100);
97+
}
98+
assertFalse(container.isRunning());
99+
context.close();
100+
}
101+
102+
@Test
103+
public void testMismatchedQueueDuringRestartMultiQueue() throws Exception {
104+
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config3.class);
105+
RabbitAdmin admin = context.getBean(RabbitAdmin.class);
106+
admin.deleteQueue(TEST_MISMATCH);
107+
admin.declareQueue(new Queue(TEST_MISMATCH, false, false, true));
89108
SimpleMessageListenerContainer container = context.getBean(SimpleMessageListenerContainer.class);
90109
int n = 0;
91110
while (n++ < 100 && container.isRunning()) {
@@ -106,7 +125,7 @@ public ConnectionFactory connectionFactory() {
106125
@Bean
107126
public SimpleMessageListenerContainer container() {
108127
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
109-
container.setQueueNames("test.mismatch");
128+
container.setQueueNames(TEST_MISMATCH);
110129
container.setMessageListener(new MessageListenerAdapter(new Object() {
111130

112131
@SuppressWarnings("unused")
@@ -120,7 +139,7 @@ public void handleMessage(Message m) {
120139

121140
@Bean
122141
public Queue queue() {
123-
return new Queue("test.mismatch", false, false, true); // mismatched
142+
return new Queue(TEST_MISMATCH, false, false, true); // mismatched
124143
}
125144

126145
}
@@ -141,7 +160,25 @@ static class Config2 extends Config1 {
141160
@Override
142161
@Bean
143162
public Queue queue() {
144-
return new Queue("test.mismatch", true, false, false);
163+
return new Queue(TEST_MISMATCH, true, false, false);
164+
}
165+
166+
}
167+
168+
@Configuration
169+
static class Config3 extends Config2 {
170+
171+
172+
@Override
173+
public SimpleMessageListenerContainer container() {
174+
SimpleMessageListenerContainer container = super.container();
175+
container.setQueueNames(TEST_MISMATCH, TEST_MISMATCH2);
176+
return container;
177+
}
178+
179+
@Bean
180+
public Queue queue2() {
181+
return new Queue(TEST_MISMATCH2, true, false, false);
145182
}
146183

147184
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegration2Tests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ public void testChangeQueues() throws Exception {
154154
public void testDeleteOneQueue() throws Exception {
155155
CountDownLatch latch = new CountDownLatch(20);
156156
container = createContainer(new MessageListenerAdapter(new PojoListener(latch)), queue.getName(), queue1.getName());
157+
container.setFailedDeclarationRetryInterval(100);
157158
ApplicationEventPublisher publisher = mock(ApplicationEventPublisher.class);
158159
container.setApplicationEventPublisher(publisher);
159160
for (int i = 0; i < 10; i++) {

0 commit comments

Comments
 (0)