Skip to content

Recovery related improvements - master #678

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/main/java/com/rabbitmq/client/TopologyRecoveryException.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,28 @@

package com.rabbitmq.client;

import com.rabbitmq.client.impl.recovery.RecordedEntity;

/**
* Indicates an exception thrown during topology recovery.
*
* @see com.rabbitmq.client.ConnectionFactory#setTopologyRecoveryEnabled(boolean)
* @since 3.3.0
*/
public class TopologyRecoveryException extends Exception {

private final RecordedEntity recordedEntity;

public TopologyRecoveryException(String message, Throwable cause) {
this(message, cause, null);
}

public TopologyRecoveryException(String message, Throwable cause, final RecordedEntity recordedEntity) {
super(message, cause);
this.recordedEntity = recordedEntity;
}

public RecordedEntity getRecordedEntity() {
return recordedEntity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,11 @@ private void recordConsumer(String result,
this.connection.recordConsumer(result, consumer);
}

private void deleteRecordedConsumer(String consumerTag) {
/**
* Delete the recorded consumer from this channel and accompanying connection
* @param consumerTag consumer tag to delete
*/
public void deleteRecordedConsumer(String consumerTag) {
this.consumerTags.remove(consumerTag);
RecordedConsumer c = this.connection.deleteRecordedConsumer(consumerTag);
if (c != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private void setupErrorOnWriteListenerForPotentialRecovery() {
});
}

private TopologyRecoveryFilter letAllPassFilter() {
private static TopologyRecoveryFilter letAllPassFilter() {
return new TopologyRecoveryFilter() {};
}

Expand Down Expand Up @@ -644,7 +644,7 @@ private void recoverChannels(final RecoveryAwareAMQConnection newConn) {
}
}

void recoverChannel(AutorecoveringChannel channel) throws IOException {
public void recoverChannel(AutorecoveringChannel channel) throws IOException {
channel.automaticallyRecover(this, this.delegate);
}

Expand All @@ -666,6 +666,38 @@ private void notifyTopologyRecoveryListenersStarted() {
}
}

/**
* Recover a closed channel and all topology (i.e. RecordedEntities) associated to it.
* Any errors will be sent to the {@link #getExceptionHandler()}.
* @param channel channel to recover
* @throws IllegalArgumentException if this channel is not owned by this connection
*/
public void recoverChannelAndTopology(final AutorecoveringChannel channel) {
if (!channels.containsValue(channel)) {
throw new IllegalArgumentException("This channel is not owned by this connection");
}
try {
LOGGER.debug("Recovering channel={}", channel);
recoverChannel(channel);
LOGGER.debug("Recovered channel={}. Now recovering its topology", channel);
Utility.copy(recordedExchanges).values().stream()
.filter(e -> e.getChannel() == channel)
.forEach(e -> recoverExchange(e, false));
Utility.copy(recordedQueues).values().stream()
.filter(q -> q.getChannel() == channel)
.forEach(q -> recoverQueue(q.getName(), q, false));
Utility.copy(recordedBindings).stream()
.filter(b -> b.getChannel() == channel)
.forEach(b -> recoverBinding(b, false));
Utility.copy(consumers).values().stream()
.filter(c -> c.getChannel() == channel)
.forEach(c -> recoverConsumer(c.getConsumerTag(), c, false));
LOGGER.debug("Recovered topology for channel={}", channel);
} catch (Exception e) {
getExceptionHandler().handleChannelRecoveryException(channel, e);
}
}

private void recoverTopology(final ExecutorService executor) {
// The recovery sequence is the following:
// 1. Recover exchanges
Expand Down Expand Up @@ -704,7 +736,7 @@ private void recoverTopology(final ExecutorService executor) {
}
}

private void recoverExchange(RecordedExchange x, boolean retry) {
public void recoverExchange(RecordedExchange x, boolean retry) {
// recorded exchanges are guaranteed to be non-predefined (we filter out predefined ones in exchangeDeclare). MK.
try {
if (topologyRecoveryFilter.filterExchange(x)) {
Expand All @@ -722,7 +754,7 @@ private void recoverExchange(RecordedExchange x, boolean retry) {
} catch (Exception cause) {
final String message = "Caught an exception while recovering exchange " + x.getName() +
": " + cause.getMessage();
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
TopologyRecoveryException e = new TopologyRecoveryException(message, cause, x);
this.getExceptionHandler().handleTopologyRecoveryException(delegate, x.getDelegateChannel(), e);
}
}
Expand Down Expand Up @@ -766,12 +798,12 @@ public void recoverQueue(final String oldName, RecordedQueue q, boolean retry) {
} catch (Exception cause) {
final String message = "Caught an exception while recovering queue " + oldName +
": " + cause.getMessage();
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
TopologyRecoveryException e = new TopologyRecoveryException(message, cause, q);
this.getExceptionHandler().handleTopologyRecoveryException(delegate, q.getDelegateChannel(), e);
}
}

private void recoverBinding(RecordedBinding b, boolean retry) {
public void recoverBinding(RecordedBinding b, boolean retry) {
try {
if (this.topologyRecoveryFilter.filterBinding(b)) {
if (retry) {
Expand All @@ -788,7 +820,7 @@ private void recoverBinding(RecordedBinding b, boolean retry) {
} catch (Exception cause) {
String message = "Caught an exception while recovering binding between " + b.getSource() +
" and " + b.getDestination() + ": " + cause.getMessage();
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
TopologyRecoveryException e = new TopologyRecoveryException(message, cause, b);
this.getExceptionHandler().handleTopologyRecoveryException(delegate, b.getDelegateChannel(), e);
}
}
Expand All @@ -800,7 +832,7 @@ public void recoverConsumer(final String tag, RecordedConsumer consumer, boolean
String newTag = null;
if (retry) {
final RecordedConsumer entity = consumer;
RetryResult retryResult = wrapRetryIfNecessary(consumer, () -> entity.recover());
RetryResult retryResult = wrapRetryIfNecessary(consumer, entity::recover);
consumer = (RecordedConsumer) retryResult.getRecordedEntity();
newTag = (String) retryResult.getResult();
} else {
Expand All @@ -824,7 +856,7 @@ public void recoverConsumer(final String tag, RecordedConsumer consumer, boolean
} catch (Exception cause) {
final String message = "Caught an exception while recovering consumer " + tag +
": " + cause.getMessage();
TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
TopologyRecoveryException e = new TopologyRecoveryException(message, cause, consumer);
this.getExceptionHandler().handleTopologyRecoveryException(delegate, consumer.getDelegateChannel(), e);
}
}
Expand Down Expand Up @@ -889,14 +921,10 @@ private void recoverEntitiesAsynchronously(ExecutorService executor, Collection<

private <E extends RecordedEntity> List<Callable<Object>> groupEntitiesByChannel(final Collection<E> entities) {
// map entities by channel
final Map<AutorecoveringChannel, List<E>> map = new LinkedHashMap<AutorecoveringChannel, List<E>>();
final Map<AutorecoveringChannel, List<E>> map = new LinkedHashMap<>();
for (final E entity : entities) {
final AutorecoveringChannel channel = entity.getChannel();
List<E> list = map.get(channel);
if (list == null) {
map.put(channel, list = new ArrayList<E>());
}
list.add(entity);
map.computeIfAbsent(channel, c -> new ArrayList<>()).add(entity);
}
// now create a runnable per channel
final List<Callable<Object>> callables = new ArrayList<>();
Expand Down Expand Up @@ -1083,7 +1111,7 @@ boolean hasMoreConsumersOnQueue(Collection<RecordedConsumer> consumers, String q
}

Set<RecordedBinding> removeBindingsWithDestination(String s) {
final Set<RecordedBinding> result = new HashSet<RecordedBinding>();
final Set<RecordedBinding> result = new LinkedHashSet<>();
synchronized (this.recordedBindings) {
for (Iterator<RecordedBinding> it = this.recordedBindings.iterator(); it.hasNext(); ) {
RecordedBinding b = it.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,19 @@ public class DefaultRetryHandler implements RetryHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRetryHandler.class);

private final BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition;
private final BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition;
private final BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition;
private final BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition;
protected final BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition;
protected final BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition;
protected final BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition;
protected final BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition;

private final RetryOperation<?> queueRecoveryRetryOperation;
private final RetryOperation<?> exchangeRecoveryRetryOperation;
private final RetryOperation<?> bindingRecoveryRetryOperation;
private final RetryOperation<?> consumerRecoveryRetryOperation;
protected final RetryOperation<?> queueRecoveryRetryOperation;
protected final RetryOperation<?> exchangeRecoveryRetryOperation;
protected final RetryOperation<?> bindingRecoveryRetryOperation;
protected final RetryOperation<?> consumerRecoveryRetryOperation;

private final int retryAttempts;
protected final int retryAttempts;

private final BackoffPolicy backoffPolicy;
protected final BackoffPolicy backoffPolicy;

public DefaultRetryHandler(BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition,
BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@
*/
public class TopologyRecoveryRetryHandlerBuilder {

private BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition = (q, e) -> false;
private BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition = (ex, e) -> false;
private BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition = (b, e) -> false;
private BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition = (c, e) -> false;
protected BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition = (q, e) -> false;
protected BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition = (ex, e) -> false;
protected BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition = (b, e) -> false;
protected BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition = (c, e) -> false;

private DefaultRetryHandler.RetryOperation<?> queueRecoveryRetryOperation = context -> null;
private DefaultRetryHandler.RetryOperation<?> exchangeRecoveryRetryOperation = context -> null;
private DefaultRetryHandler.RetryOperation<?> bindingRecoveryRetryOperation = context -> null;
private DefaultRetryHandler.RetryOperation<?> consumerRecoveryRetryOperation = context -> null;
protected DefaultRetryHandler.RetryOperation<?> queueRecoveryRetryOperation = context -> null;
protected DefaultRetryHandler.RetryOperation<?> exchangeRecoveryRetryOperation = context -> null;
protected DefaultRetryHandler.RetryOperation<?> bindingRecoveryRetryOperation = context -> null;
protected DefaultRetryHandler.RetryOperation<?> consumerRecoveryRetryOperation = context -> null;

private int retryAttempts = 2;
protected int retryAttempts = 2;

private BackoffPolicy backoffPolicy = nbAttempts -> {
protected BackoffPolicy backoffPolicy = nbAttempts -> {
};

public static TopologyRecoveryRetryHandlerBuilder builder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ public abstract class TopologyRecoveryRetryLogic {
}
return null;
};

/**
* Recover a queue
*/
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_QUEUE = context -> {
if (context.entity() instanceof RecordedQueue) {
final RecordedQueue recordedQueue = context.queue();
AutorecoveringConnection connection = context.connection();
connection.recoverQueue(recordedQueue.getName(), recordedQueue, false);
}
return null;
};

/**
* Recover the destination queue of a binding.
Expand Down Expand Up @@ -138,18 +150,52 @@ public abstract class TopologyRecoveryRetryLogic {
* Recover a consumer.
*/
public static final DefaultRetryHandler.RetryOperation<String> RECOVER_CONSUMER = context -> context.consumer().recover();

/**
* Recover earlier consumers that share the same channel as this retry context
*/
public static final DefaultRetryHandler.RetryOperation<String> RECOVER_PREVIOUS_CONSUMERS = context -> {
if (context.entity() instanceof RecordedConsumer) {
// recover all consumers for the same channel that were recovered before this current
// consumer. need to do this incase some consumers had already been recovered
// successfully on a different queue before this one failed
final AutorecoveringChannel channel = context.consumer().getChannel();
for (RecordedConsumer consumer : Utility.copy(context.connection().getRecordedConsumers()).values()) {
if (consumer == context.entity()) {
break;
} else if (consumer.getChannel() == channel) {
final RetryContext retryContext = new RetryContext(consumer, context.exception(), context.connection());
RECOVER_CONSUMER_QUEUE.call(retryContext);
consumer.recover();
RECOVER_CONSUMER_QUEUE_BINDINGS.call(retryContext);
}
}
}
return null;
};

/**
* Pre-configured {@link TopologyRecoveryRetryHandlerBuilder} that retries recovery of bindings and consumers
* when their respective queue is not found.
*
* This retry handler can be useful for long recovery processes, whereby auto-delete queues
* can be deleted between queue recovery and binding/consumer recovery.
*
* Also useful to retry channel-closed 404 errors that may arise with auto-delete queues during a cluster cycle.
*/
public static final TopologyRecoveryRetryHandlerBuilder RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER = builder()
.queueRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
.bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
.consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
.bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING)
.queueRecoveryRetryOperation(RECOVER_CHANNEL
.andThen(RECOVER_QUEUE))
.bindingRecoveryRetryOperation(RECOVER_CHANNEL
.andThen(RECOVER_BINDING_QUEUE)
.andThen(RECOVER_BINDING)
.andThen(RECOVER_PREVIOUS_QUEUE_BINDINGS))
.consumerRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_CONSUMER_QUEUE.andThen(RECOVER_CONSUMER)
.andThen(RECOVER_CONSUMER_QUEUE_BINDINGS)));
.consumerRecoveryRetryOperation(RECOVER_CHANNEL
.andThen(RECOVER_CONSUMER_QUEUE)
.andThen(RECOVER_CONSUMER)
.andThen(RECOVER_CONSUMER_QUEUE_BINDINGS)
.andThen(RECOVER_PREVIOUS_CONSUMERS));
}