Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ You need a local running RabbitMQ instance.

Start a RabbitMQ container:

docker run -it --rm --name rabbitmq -p 5672:5672 rabbitmq:3.8
docker run -it --rm --name rabbitmq -p 5672:5672 rabbitmq:3.10

Run the test suite:

Expand Down
10 changes: 10 additions & 0 deletions src/main/java/com/rabbitmq/perf/AgentBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ public abstract class AgentBase {

private static final Logger LOGGER = LoggerFactory.getLogger(AgentBase.class);

private volatile TopologyRecording topologyRecording;

public void setTopologyRecording(TopologyRecording topologyRecording) {
this.topologyRecording = topologyRecording;
}

protected TopologyRecording topologyRecording() {
return this.topologyRecording;
}

protected void delay(long now, AgentState state) {

long elapsed = now - state.getLastStatsTime();
Expand Down
69 changes: 64 additions & 5 deletions src/main/java/com/rabbitmq/perf/Consumer.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand All @@ -19,7 +19,11 @@
import com.rabbitmq.client.*;
import com.rabbitmq.client.AMQP.Queue.DeclareOk;
import com.rabbitmq.perf.PerfTest.EXIT_WHEN;
import com.rabbitmq.perf.TopologyRecording.RecordedQueue;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -85,6 +89,8 @@ public class Consumer extends AgentBase implements Runnable {

private volatile long lastDeliveryTag, lastAckedDeliveryTag;

private final ScheduledExecutorService topologyRecoveryScheduledExecutorService;

public Consumer(ConsumerParameters parameters) {
this.channel = parameters.getChannel();
this.id = parameters.getId();
Expand All @@ -101,6 +107,7 @@ public Consumer(ConsumerParameters parameters) {
this.pollingInterval = parameters.getPollingInterval();
this.consumerArguments = parameters.getConsumerArguments();
this.exitWhen = parameters.getExitWhen();
this.topologyRecoveryScheduledExecutorService = parameters.getTopologyRecoveryScheduledExecutorService();

this.queueNames.set(new ArrayList<>(parameters.getQueueNames()));
this.initialQueueNames = new ArrayList<>(parameters.getQueueNames());
Expand Down Expand Up @@ -313,8 +320,9 @@ public void handleCancel(String consumerTag) throws IOException {
System.out.printf("Consumer cancelled by broker for tag: %s", consumerTag);
if (consumerTagBranchMap.containsKey(consumerTag)) {
String qName = consumerTagBranchMap.get(consumerTag);
System.out.printf("Re-consuming. Queue: %s for Tag: %s", qName, consumerTag);
channel.basicConsume(qName, autoAck, consumerArguments, q);
TopologyRecording topologyRecording = topologyRecording();
RecordedQueue queueRecord = topologyRecording.queue(qName);
consumeOrScheduleConsume(queueRecord, topologyRecording, consumerTag, qName);
} else {
System.out.printf("Could not find queue for consumer tag: %s", consumerTag);
}
Expand Down Expand Up @@ -348,10 +356,12 @@ public void recover(TopologyRecording topologyRecording) {
} else {
for (Map.Entry<String, String> entry : consumerTagBranchMap.entrySet()) {
String queueName = queueName(topologyRecording, entry.getValue());
String consumerTag = entry.getKey();
LOGGER.debug("Recovering consumer, starting consuming on {}", queueName);
try {
channel.basicConsume(queueName, autoAck, entry.getKey(), false, false, this.consumerArguments, q);
} catch (IOException e) {
TopologyRecording.RecordedQueue queueRecord = topologyRecording.queue(entry.getValue());
consumeOrScheduleConsume(queueRecord, topologyRecording, consumerTag, queueName);
} catch (Exception e) {
LOGGER.warn(
"Error while recovering consumer {} on queue {} on connection {}",
entry.getKey(), queueName, channel.getConnection().getClientProvidedName(), e
Expand All @@ -361,6 +371,55 @@ public void recover(TopologyRecording topologyRecording) {
}
}

private void consumeOrScheduleConsume(RecordedQueue queueRecord,
TopologyRecording topologyRecording,
String consumerTag,
String queueName) throws IOException {
if (queueMayBeDown(queueRecord, topologyRecording)) {
// If the queue is on a cluster node that is down, basic.consume will fail with a 404.
// This will close the channel and we can't afford it, so we check if the queue exists with a different channel,
// and postpone the subscription if the queue does not exist
LOGGER.debug("Checking if queue {} exists before subscribing", queueName);
if (Utils.exists(channel.getConnection(), ch -> ch.queueDeclarePassive(queueName))) {
LOGGER.debug("Queue {} does exist, subscribing", queueName);
channel.basicConsume(queueName, autoAck, consumerTag, false, false, this.consumerArguments, q);
} else {
LOGGER.debug("Queue {} does not exist, it is likely unavailable, scheduling subscription.", queueName);
Duration schedulingPeriod = Duration.ofSeconds(5);
int maxRetry = (int) (Duration.ofMinutes(10).getSeconds() / schedulingPeriod.getSeconds());
AtomicInteger retryCount = new AtomicInteger(0);
AtomicReference<Callable<Void>> resubscriptionReference = new AtomicReference<>();
Callable<Void> resubscription = () -> {
LOGGER.debug("Scheduled re-subscription for {}...", queueName);
if (Utils.exists(channel.getConnection(), ch -> ch.queueDeclarePassive(queueName))) {
LOGGER.debug("Queue {} exists, re-subscribing", queueName);
channel.basicConsume(queueName, autoAck, consumerTag, false, false, consumerArguments, q);
} else if (retryCount.incrementAndGet() <= maxRetry){
LOGGER.debug("Queue {} does not exist, scheduling re-subscription", queueName);
this.topologyRecoveryScheduledExecutorService.schedule(
resubscriptionReference.get(), schedulingPeriod.getSeconds(), TimeUnit.SECONDS
);
} else {
LOGGER.debug("Max subscription retry count reached {} for queue {}",
retryCount.get(), queueName);
}
return null;
};
resubscriptionReference.set(resubscription);
this.topologyRecoveryScheduledExecutorService.schedule(
resubscription, schedulingPeriod.getSeconds(), TimeUnit.SECONDS
);
}
} else {
channel.basicConsume(queueName, autoAck, consumerTag, false, false, this.consumerArguments, q);
}
}

private static boolean queueMayBeDown(RecordedQueue queueRecord, TopologyRecording topologyRecording) {
return queueRecord != null
&& queueRecord.isClassic() && queueRecord.isDurable() && topologyRecording.isCluster();
}

void maybeStopIfNoActivityOrQueueEmpty() {
LOGGER.debug("Checking consumer activity");
if (this.exitWhen == EXIT_WHEN.NEVER) {
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/com/rabbitmq/perf/ConsumerParameters.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2019-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2019-2022 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand All @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

/**
*
Expand Down Expand Up @@ -52,6 +53,8 @@ public class ConsumerParameters {

private Map<String, Object> consumerArguments = null;

private ScheduledExecutorService topologyRecoveryScheduledExecutorService;

public Channel getChannel() {
return channel;
}
Expand Down Expand Up @@ -231,4 +234,14 @@ public ConsumerParameters setExitWhen(EXIT_WHEN exitWhen) {
public EXIT_WHEN getExitWhen() {
return exitWhen;
}

ConsumerParameters setTopologyRecoveryScheduledExecutorService(
ScheduledExecutorService topologyRecoveryScheduledExecutorService) {
this.topologyRecoveryScheduledExecutorService = topologyRecoveryScheduledExecutorService;
return this;
}

public ScheduledExecutorService getTopologyRecoveryScheduledExecutorService() {
return topologyRecoveryScheduledExecutorService;
}
}
46 changes: 17 additions & 29 deletions src/main/java/com/rabbitmq/perf/MulticastParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ShutdownSignalException;

import com.rabbitmq.perf.PerfTest.EXIT_WHEN;
import java.io.IOException;
Expand All @@ -31,11 +30,13 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static com.rabbitmq.perf.Recovery.setupRecoveryProcess;
import static com.rabbitmq.perf.Utils.exists;

public class MulticastParams {

Expand Down Expand Up @@ -127,6 +128,8 @@ public class MulticastParams {
// for random JSON body generation
private AtomicReference<MessageBodySource> messageBodySourceReference = new AtomicReference<>();

private boolean cluster = false;

public void setExchangeType(String exchangeType) {
this.exchangeType = exchangeType;
}
Expand Down Expand Up @@ -293,6 +296,10 @@ public void setExitWhen(EXIT_WHEN exitWhen) {
this.exitWhen = exitWhen;
}

void setCluster(boolean cluster) {
this.cluster = cluster;
}

public int getConsumerCount() {
return consumerCount;
}
Expand Down Expand Up @@ -475,7 +482,7 @@ public Producer createProducer(Connection connection, Stats stats, MulticastSet.
Channel channel = connection.createChannel(); //NOSONAR
if (producerTxSize > 0) channel.txSelect();
if (confirm >= 0) channel.confirmSelect();
TopologyRecording topologyRecording = new TopologyRecording(this.isPolling());
TopologyRecording topologyRecording = new TopologyRecording(this.isPolling(), this.cluster);
if (!predeclared || !exchangeExists(connection, exchangeName)) {
Utils.exchangeDeclare(channel, exchangeName, exchangeType);
topologyRecording.recordExchange(exchangeName, exchangeType);
Expand Down Expand Up @@ -519,7 +526,8 @@ public Consumer createConsumer(Connection connection,
Stats stats,
ValueIndicator<Long> consumerLatenciesIndicator,
MulticastSet.CompletionHandler completionHandler,
ExecutorService executorService) throws IOException {
ExecutorService executorService,
ScheduledExecutorService topologyRecordingScheduledExecutorService) throws IOException {
TopologyHandlerResult topologyHandlerResult = this.topologyHandler.configureQueuesForClient(connection);
connection = topologyHandlerResult.connection;
Channel channel = connection.createChannel(); //NOSONAR
Expand Down Expand Up @@ -559,17 +567,20 @@ public Consumer createConsumer(Connection connection,
.setRequeue(this.requeue)
.setConsumerArguments(this.consumerArguments)
.setExitWhen(this.exitWhen)
.setTopologyRecoveryScheduledExecutorService(topologyRecordingScheduledExecutorService)
);
this.topologyHandler.next();
return consumer;
}

public List<TopologyHandlerResult> configureAllQueues(List<Connection> connection) throws IOException {
return this.topologyHandler.configureAllQueues(connection);
public List<TopologyHandlerResult> configureAllQueues(List<Connection> connections) throws IOException {
return this.topologyHandler.configureAllQueues(connections);
}

public void init() {
this.topologyRecording = new TopologyRecording(this.isPolling());
this.topologyRecording = new TopologyRecording(
this.isPolling(), this.cluster
);
if (this.queuePattern == null && !this.queuesInSequence) {
this.topologyHandler = new FixedQueuesTopologyHandler(this, this.routingKey, this.queueNames, topologyRecording);
} else if (this.queuePattern == null && this.queuesInSequence) {
Expand Down Expand Up @@ -650,29 +661,6 @@ public void setProducerSchedulerThreadCount(int producerSchedulerThreadCount) {
this.producerSchedulerThreadCount = producerSchedulerThreadCount;
}

private interface Checker {
void check(Channel ch) throws IOException;
}

private static boolean exists(Connection connection, Checker checker) throws IOException {
try {
Channel ch = connection.createChannel();
checker.check(ch);
ch.abort();
return true;
}
catch (IOException e) {
ShutdownSignalException sse = (ShutdownSignalException) e.getCause();
if (!sse.isHardError()) {
AMQP.Channel.Close closeMethod = (AMQP.Channel.Close) sse.getReason();
if (closeMethod.getReplyCode() == AMQP.NOT_FOUND) {
return false;
}
}
throw e;
}
}

/**
* Contract to handle the creation and configuration of resources.
* E.g. creation of queues, binding exchange to queues.
Expand Down
20 changes: 16 additions & 4 deletions src/main/java/com/rabbitmq/perf/MulticastSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,22 @@ public void run(boolean announceStartup)
List<MulticastParams.TopologyHandlerResult> topologyHandlerResults = params.configureAllQueues(configurationConnections);
enableTopologyRecoveryIfNecessary(topologyHandlerResults);

ScheduledExecutorService topologyRecordingScheduledExecutorService = null;
if (!configurationConnections.isEmpty() && Utils.isRecoverable(configurationConnections.get(0))) {
topologyRecordingScheduledExecutorService = this.threadingHandler.scheduledExecutorService(
"perf-test-topology-recovery-", 1
);
}

this.params.resetTopologyHandler();

Consumer[] consumerRunnables = new Consumer[params.getConsumerThreadCount()];
Connection[] consumerConnections = new Connection[params.getConsumerCount()];
Function<Integer, ExecutorService> consumersExecutorsFactory;
consumersExecutorsFactory = createConsumersExecutorsFactory();

createConsumers(announceStartup, consumerRunnables, consumerConnections, consumersExecutorsFactory);
createConsumers(announceStartup, consumerRunnables, consumerConnections,
consumersExecutorsFactory, topologyRecordingScheduledExecutorService);

this.params.resetTopologyHandler();

Expand Down Expand Up @@ -376,7 +384,8 @@ private Function<Integer, ExecutorService> createConsumersExecutorsFactory() {
private void createConsumers(boolean announceStartup,
Runnable[] consumerRunnables,
Connection[] consumerConnections,
Function<Integer, ExecutorService> consumersExecutorsFactory) throws IOException, TimeoutException {
Function<Integer, ExecutorService> consumersExecutorsFactory,
ScheduledExecutorService topologyRecordingScheduledExecutorService) throws IOException, TimeoutException {
for (int i = 0; i < consumerConnections.length; i++) {
if (announceStartup) {
System.out.println("id: " + testID + ", starting consumer #" + i);
Expand All @@ -390,13 +399,16 @@ private void createConsumers(boolean announceStartup,
if (announceStartup) {
System.out.println("id: " + testID + ", starting consumer #" + i + ", channel #" + j);
}
Consumer consumer = params.createConsumer(consumerConnection, stats, this.consumerLatencyIndicator, this.completionHandler, executorService);
Consumer consumer = params.createConsumer(consumerConnection, stats,
this.consumerLatencyIndicator, this.completionHandler, executorService,
topologyRecordingScheduledExecutorService);
consumerRunnables[(i * params.getConsumerChannelCount()) + j] = consumer;
}
}
}

private void createProducers(boolean announceStartup, AgentState[] producerStates, Connection[] producerConnections) throws IOException, TimeoutException {
private void createProducers(boolean announceStartup, AgentState[] producerStates,
Connection[] producerConnections) throws IOException, TimeoutException {
for (int i = 0; i < producerConnections.length; i++) {
if (announceStartup) {
System.out.println("id: " + testID + ", starting producer #" + i);
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/rabbitmq/perf/PerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
p.setConsumerArguments(convertKeyValuePairs(consumerArgs));
p.setQueuesInSequence(queueFile != null);
p.setExitWhen(exitWhen);
p.setCluster(uris.size() > 0);

ConcurrentMap<String, Integer> completionReasons = new ConcurrentHashMap<>();

Expand Down
4 changes: 3 additions & 1 deletion src/main/java/com/rabbitmq/perf/Recovery.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -57,6 +57,7 @@ static Recovery.RecoveryProcess setupRecoveryProcess(Connection connection, Topo
@Override
public void init(AgentBase agent) {
agentReference.set(agent);
agent.setTopologyRecording(topologyRecording);
}

@Override
Expand All @@ -74,6 +75,7 @@ public boolean isEnabled() {

@Override
public void handleRecoveryStarted(Recoverable recoverable) {
// FIXME use lock to avoid start topology recovery twice
recoveryInProgress.set(true);
}

Expand Down
Loading