Skip to content

Commit b933117

Browse files
Merge pull request #335 from rabbitmq/rabbitmq-perf-test-330-some-classic-queues-do-not-have-consumers-after-node-restart
Schedule re-subscription if necessary after node restart
2 parents 00fde99 + 506e4b3 commit b933117

File tree

12 files changed

+425
-52
lines changed

12 files changed

+425
-52
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ You need a local running RabbitMQ instance.
7979

8080
Start a RabbitMQ container:
8181

82-
docker run -it --rm --name rabbitmq -p 5672:5672 rabbitmq:3.8
82+
docker run -it --rm --name rabbitmq -p 5672:5672 rabbitmq:3.10
8383

8484
Run the test suite:
8585

src/main/java/com/rabbitmq/perf/AgentBase.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@ public abstract class AgentBase {
3030

3131
private static final Logger LOGGER = LoggerFactory.getLogger(AgentBase.class);
3232

33+
private volatile TopologyRecording topologyRecording;
34+
35+
public void setTopologyRecording(TopologyRecording topologyRecording) {
36+
this.topologyRecording = topologyRecording;
37+
}
38+
39+
protected TopologyRecording topologyRecording() {
40+
return this.topologyRecording;
41+
}
42+
3343
protected void delay(long now, AgentState state) {
3444

3545
long elapsed = now - state.getLastStatsTime();

src/main/java/com/rabbitmq/perf/Consumer.java

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -19,7 +19,11 @@
1919
import com.rabbitmq.client.*;
2020
import com.rabbitmq.client.AMQP.Queue.DeclareOk;
2121
import com.rabbitmq.perf.PerfTest.EXIT_WHEN;
22+
import com.rabbitmq.perf.TopologyRecording.RecordedQueue;
2223
import java.time.Duration;
24+
import java.util.concurrent.Callable;
25+
import java.util.concurrent.ScheduledExecutorService;
26+
import java.util.concurrent.TimeUnit;
2327
import org.slf4j.Logger;
2428
import org.slf4j.LoggerFactory;
2529

@@ -85,6 +89,8 @@ public class Consumer extends AgentBase implements Runnable {
8589

8690
private volatile long lastDeliveryTag, lastAckedDeliveryTag;
8791

92+
private final ScheduledExecutorService topologyRecoveryScheduledExecutorService;
93+
8894
public Consumer(ConsumerParameters parameters) {
8995
this.channel = parameters.getChannel();
9096
this.id = parameters.getId();
@@ -101,6 +107,7 @@ public Consumer(ConsumerParameters parameters) {
101107
this.pollingInterval = parameters.getPollingInterval();
102108
this.consumerArguments = parameters.getConsumerArguments();
103109
this.exitWhen = parameters.getExitWhen();
110+
this.topologyRecoveryScheduledExecutorService = parameters.getTopologyRecoveryScheduledExecutorService();
104111

105112
this.queueNames.set(new ArrayList<>(parameters.getQueueNames()));
106113
this.initialQueueNames = new ArrayList<>(parameters.getQueueNames());
@@ -313,8 +320,9 @@ public void handleCancel(String consumerTag) throws IOException {
313320
System.out.printf("Consumer cancelled by broker for tag: %s", consumerTag);
314321
if (consumerTagBranchMap.containsKey(consumerTag)) {
315322
String qName = consumerTagBranchMap.get(consumerTag);
316-
System.out.printf("Re-consuming. Queue: %s for Tag: %s", qName, consumerTag);
317-
channel.basicConsume(qName, autoAck, consumerArguments, q);
323+
TopologyRecording topologyRecording = topologyRecording();
324+
RecordedQueue queueRecord = topologyRecording.queue(qName);
325+
consumeOrScheduleConsume(queueRecord, topologyRecording, consumerTag, qName);
318326
} else {
319327
System.out.printf("Could not find queue for consumer tag: %s", consumerTag);
320328
}
@@ -348,10 +356,12 @@ public void recover(TopologyRecording topologyRecording) {
348356
} else {
349357
for (Map.Entry<String, String> entry : consumerTagBranchMap.entrySet()) {
350358
String queueName = queueName(topologyRecording, entry.getValue());
359+
String consumerTag = entry.getKey();
351360
LOGGER.debug("Recovering consumer, starting consuming on {}", queueName);
352361
try {
353-
channel.basicConsume(queueName, autoAck, entry.getKey(), false, false, this.consumerArguments, q);
354-
} catch (IOException e) {
362+
TopologyRecording.RecordedQueue queueRecord = topologyRecording.queue(entry.getValue());
363+
consumeOrScheduleConsume(queueRecord, topologyRecording, consumerTag, queueName);
364+
} catch (Exception e) {
355365
LOGGER.warn(
356366
"Error while recovering consumer {} on queue {} on connection {}",
357367
entry.getKey(), queueName, channel.getConnection().getClientProvidedName(), e
@@ -361,6 +371,55 @@ public void recover(TopologyRecording topologyRecording) {
361371
}
362372
}
363373

374+
private void consumeOrScheduleConsume(RecordedQueue queueRecord,
375+
TopologyRecording topologyRecording,
376+
String consumerTag,
377+
String queueName) throws IOException {
378+
if (queueMayBeDown(queueRecord, topologyRecording)) {
379+
// If the queue is on a cluster node that is down, basic.consume will fail with a 404.
380+
// This will close the channel and we can't afford it, so we check if the queue exists with a different channel,
381+
// and postpone the subscription if the queue does not exist
382+
LOGGER.debug("Checking if queue {} exists before subscribing", queueName);
383+
if (Utils.exists(channel.getConnection(), ch -> ch.queueDeclarePassive(queueName))) {
384+
LOGGER.debug("Queue {} does exist, subscribing", queueName);
385+
channel.basicConsume(queueName, autoAck, consumerTag, false, false, this.consumerArguments, q);
386+
} else {
387+
LOGGER.debug("Queue {} does not exist, it is likely unavailable, scheduling subscription.", queueName);
388+
Duration schedulingPeriod = Duration.ofSeconds(5);
389+
int maxRetry = (int) (Duration.ofMinutes(10).getSeconds() / schedulingPeriod.getSeconds());
390+
AtomicInteger retryCount = new AtomicInteger(0);
391+
AtomicReference<Callable<Void>> resubscriptionReference = new AtomicReference<>();
392+
Callable<Void> resubscription = () -> {
393+
LOGGER.debug("Scheduled re-subscription for {}...", queueName);
394+
if (Utils.exists(channel.getConnection(), ch -> ch.queueDeclarePassive(queueName))) {
395+
LOGGER.debug("Queue {} exists, re-subscribing", queueName);
396+
channel.basicConsume(queueName, autoAck, consumerTag, false, false, consumerArguments, q);
397+
} else if (retryCount.incrementAndGet() <= maxRetry){
398+
LOGGER.debug("Queue {} does not exist, scheduling re-subscription", queueName);
399+
this.topologyRecoveryScheduledExecutorService.schedule(
400+
resubscriptionReference.get(), schedulingPeriod.getSeconds(), TimeUnit.SECONDS
401+
);
402+
} else {
403+
LOGGER.debug("Max subscription retry count reached {} for queue {}",
404+
retryCount.get(), queueName);
405+
}
406+
return null;
407+
};
408+
resubscriptionReference.set(resubscription);
409+
this.topologyRecoveryScheduledExecutorService.schedule(
410+
resubscription, schedulingPeriod.getSeconds(), TimeUnit.SECONDS
411+
);
412+
}
413+
} else {
414+
channel.basicConsume(queueName, autoAck, consumerTag, false, false, this.consumerArguments, q);
415+
}
416+
}
417+
418+
private static boolean queueMayBeDown(RecordedQueue queueRecord, TopologyRecording topologyRecording) {
419+
return queueRecord != null
420+
&& queueRecord.isClassic() && queueRecord.isDurable() && topologyRecording.isCluster();
421+
}
422+
364423
void maybeStopIfNoActivityOrQueueEmpty() {
365424
LOGGER.debug("Checking consumer activity");
366425
if (this.exitWhen == EXIT_WHEN.NEVER) {

src/main/java/com/rabbitmq/perf/ConsumerParameters.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2019-2020 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2019-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222
import java.util.Map;
2323
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.ScheduledExecutorService;
2425

2526
/**
2627
*
@@ -52,6 +53,8 @@ public class ConsumerParameters {
5253

5354
private Map<String, Object> consumerArguments = null;
5455

56+
private ScheduledExecutorService topologyRecoveryScheduledExecutorService;
57+
5558
public Channel getChannel() {
5659
return channel;
5760
}
@@ -231,4 +234,14 @@ public ConsumerParameters setExitWhen(EXIT_WHEN exitWhen) {
231234
public EXIT_WHEN getExitWhen() {
232235
return exitWhen;
233236
}
237+
238+
ConsumerParameters setTopologyRecoveryScheduledExecutorService(
239+
ScheduledExecutorService topologyRecoveryScheduledExecutorService) {
240+
this.topologyRecoveryScheduledExecutorService = topologyRecoveryScheduledExecutorService;
241+
return this;
242+
}
243+
244+
public ScheduledExecutorService getTopologyRecoveryScheduledExecutorService() {
245+
return topologyRecoveryScheduledExecutorService;
246+
}
234247
}

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 17 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import com.rabbitmq.client.AMQP;
1919
import com.rabbitmq.client.Channel;
2020
import com.rabbitmq.client.Connection;
21-
import com.rabbitmq.client.ShutdownSignalException;
2221

2322
import com.rabbitmq.perf.PerfTest.EXIT_WHEN;
2423
import java.io.IOException;
@@ -31,11 +30,13 @@
3130
import java.util.concurrent.ConcurrentHashMap;
3231
import java.util.concurrent.ConcurrentMap;
3332
import java.util.concurrent.ExecutorService;
33+
import java.util.concurrent.ScheduledExecutorService;
3434
import java.util.concurrent.TimeoutException;
3535
import java.util.concurrent.atomic.AtomicReference;
3636
import java.util.stream.Collectors;
3737

3838
import static com.rabbitmq.perf.Recovery.setupRecoveryProcess;
39+
import static com.rabbitmq.perf.Utils.exists;
3940

4041
public class MulticastParams {
4142

@@ -127,6 +128,8 @@ public class MulticastParams {
127128
// for random JSON body generation
128129
private AtomicReference<MessageBodySource> messageBodySourceReference = new AtomicReference<>();
129130

131+
private boolean cluster = false;
132+
130133
public void setExchangeType(String exchangeType) {
131134
this.exchangeType = exchangeType;
132135
}
@@ -293,6 +296,10 @@ public void setExitWhen(EXIT_WHEN exitWhen) {
293296
this.exitWhen = exitWhen;
294297
}
295298

299+
void setCluster(boolean cluster) {
300+
this.cluster = cluster;
301+
}
302+
296303
public int getConsumerCount() {
297304
return consumerCount;
298305
}
@@ -475,7 +482,7 @@ public Producer createProducer(Connection connection, Stats stats, MulticastSet.
475482
Channel channel = connection.createChannel(); //NOSONAR
476483
if (producerTxSize > 0) channel.txSelect();
477484
if (confirm >= 0) channel.confirmSelect();
478-
TopologyRecording topologyRecording = new TopologyRecording(this.isPolling());
485+
TopologyRecording topologyRecording = new TopologyRecording(this.isPolling(), this.cluster);
479486
if (!predeclared || !exchangeExists(connection, exchangeName)) {
480487
Utils.exchangeDeclare(channel, exchangeName, exchangeType);
481488
topologyRecording.recordExchange(exchangeName, exchangeType);
@@ -519,7 +526,8 @@ public Consumer createConsumer(Connection connection,
519526
Stats stats,
520527
ValueIndicator<Long> consumerLatenciesIndicator,
521528
MulticastSet.CompletionHandler completionHandler,
522-
ExecutorService executorService) throws IOException {
529+
ExecutorService executorService,
530+
ScheduledExecutorService topologyRecordingScheduledExecutorService) throws IOException {
523531
TopologyHandlerResult topologyHandlerResult = this.topologyHandler.configureQueuesForClient(connection);
524532
connection = topologyHandlerResult.connection;
525533
Channel channel = connection.createChannel(); //NOSONAR
@@ -559,17 +567,20 @@ public Consumer createConsumer(Connection connection,
559567
.setRequeue(this.requeue)
560568
.setConsumerArguments(this.consumerArguments)
561569
.setExitWhen(this.exitWhen)
570+
.setTopologyRecoveryScheduledExecutorService(topologyRecordingScheduledExecutorService)
562571
);
563572
this.topologyHandler.next();
564573
return consumer;
565574
}
566575

567-
public List<TopologyHandlerResult> configureAllQueues(List<Connection> connection) throws IOException {
568-
return this.topologyHandler.configureAllQueues(connection);
576+
public List<TopologyHandlerResult> configureAllQueues(List<Connection> connections) throws IOException {
577+
return this.topologyHandler.configureAllQueues(connections);
569578
}
570579

571580
public void init() {
572-
this.topologyRecording = new TopologyRecording(this.isPolling());
581+
this.topologyRecording = new TopologyRecording(
582+
this.isPolling(), this.cluster
583+
);
573584
if (this.queuePattern == null && !this.queuesInSequence) {
574585
this.topologyHandler = new FixedQueuesTopologyHandler(this, this.routingKey, this.queueNames, topologyRecording);
575586
} else if (this.queuePattern == null && this.queuesInSequence) {
@@ -650,29 +661,6 @@ public void setProducerSchedulerThreadCount(int producerSchedulerThreadCount) {
650661
this.producerSchedulerThreadCount = producerSchedulerThreadCount;
651662
}
652663

653-
private interface Checker {
654-
void check(Channel ch) throws IOException;
655-
}
656-
657-
private static boolean exists(Connection connection, Checker checker) throws IOException {
658-
try {
659-
Channel ch = connection.createChannel();
660-
checker.check(ch);
661-
ch.abort();
662-
return true;
663-
}
664-
catch (IOException e) {
665-
ShutdownSignalException sse = (ShutdownSignalException) e.getCause();
666-
if (!sse.isHardError()) {
667-
AMQP.Channel.Close closeMethod = (AMQP.Channel.Close) sse.getReason();
668-
if (closeMethod.getReplyCode() == AMQP.NOT_FOUND) {
669-
return false;
670-
}
671-
}
672-
throw e;
673-
}
674-
}
675-
676664
/**
677665
* Contract to handle the creation and configuration of resources.
678666
* E.g. creation of queues, binding exchange to queues.

src/main/java/com/rabbitmq/perf/MulticastSet.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -208,14 +208,22 @@ public void run(boolean announceStartup)
208208
List<MulticastParams.TopologyHandlerResult> topologyHandlerResults = params.configureAllQueues(configurationConnections);
209209
enableTopologyRecoveryIfNecessary(topologyHandlerResults);
210210

211+
ScheduledExecutorService topologyRecordingScheduledExecutorService = null;
212+
if (!configurationConnections.isEmpty() && Utils.isRecoverable(configurationConnections.get(0))) {
213+
topologyRecordingScheduledExecutorService = this.threadingHandler.scheduledExecutorService(
214+
"perf-test-topology-recovery-", 1
215+
);
216+
}
217+
211218
this.params.resetTopologyHandler();
212219

213220
Consumer[] consumerRunnables = new Consumer[params.getConsumerThreadCount()];
214221
Connection[] consumerConnections = new Connection[params.getConsumerCount()];
215222
Function<Integer, ExecutorService> consumersExecutorsFactory;
216223
consumersExecutorsFactory = createConsumersExecutorsFactory();
217224

218-
createConsumers(announceStartup, consumerRunnables, consumerConnections, consumersExecutorsFactory);
225+
createConsumers(announceStartup, consumerRunnables, consumerConnections,
226+
consumersExecutorsFactory, topologyRecordingScheduledExecutorService);
219227

220228
this.params.resetTopologyHandler();
221229

@@ -376,7 +384,8 @@ private Function<Integer, ExecutorService> createConsumersExecutorsFactory() {
376384
private void createConsumers(boolean announceStartup,
377385
Runnable[] consumerRunnables,
378386
Connection[] consumerConnections,
379-
Function<Integer, ExecutorService> consumersExecutorsFactory) throws IOException, TimeoutException {
387+
Function<Integer, ExecutorService> consumersExecutorsFactory,
388+
ScheduledExecutorService topologyRecordingScheduledExecutorService) throws IOException, TimeoutException {
380389
for (int i = 0; i < consumerConnections.length; i++) {
381390
if (announceStartup) {
382391
System.out.println("id: " + testID + ", starting consumer #" + i);
@@ -390,13 +399,16 @@ private void createConsumers(boolean announceStartup,
390399
if (announceStartup) {
391400
System.out.println("id: " + testID + ", starting consumer #" + i + ", channel #" + j);
392401
}
393-
Consumer consumer = params.createConsumer(consumerConnection, stats, this.consumerLatencyIndicator, this.completionHandler, executorService);
402+
Consumer consumer = params.createConsumer(consumerConnection, stats,
403+
this.consumerLatencyIndicator, this.completionHandler, executorService,
404+
topologyRecordingScheduledExecutorService);
394405
consumerRunnables[(i * params.getConsumerChannelCount()) + j] = consumer;
395406
}
396407
}
397408
}
398409

399-
private void createProducers(boolean announceStartup, AgentState[] producerStates, Connection[] producerConnections) throws IOException, TimeoutException {
410+
private void createProducers(boolean announceStartup, AgentState[] producerStates,
411+
Connection[] producerConnections) throws IOException, TimeoutException {
400412
for (int i = 0; i < producerConnections.length; i++) {
401413
if (announceStartup) {
402414
System.out.println("id: " + testID + ", starting producer #" + i);

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,7 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
406406
p.setConsumerArguments(convertKeyValuePairs(consumerArgs));
407407
p.setQueuesInSequence(queueFile != null);
408408
p.setExitWhen(exitWhen);
409+
p.setCluster(uris.size() > 0);
409410

410411
ConcurrentMap<String, Integer> completionReasons = new ConcurrentHashMap<>();
411412

src/main/java/com/rabbitmq/perf/Recovery.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -57,6 +57,7 @@ static Recovery.RecoveryProcess setupRecoveryProcess(Connection connection, Topo
5757
@Override
5858
public void init(AgentBase agent) {
5959
agentReference.set(agent);
60+
agent.setTopologyRecording(topologyRecording);
6061
}
6162

6263
@Override
@@ -74,6 +75,7 @@ public boolean isEnabled() {
7475

7576
@Override
7677
public void handleRecoveryStarted(Recoverable recoverable) {
78+
// FIXME use lock to avoid start topology recovery twice
7779
recoveryInProgress.set(true);
7880
}
7981

0 commit comments

Comments
 (0)