-
Notifications
You must be signed in to change notification settings - Fork 582
Recovery related improvements #677
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,19 +40,19 @@ public class DefaultRetryHandler implements RetryHandler { | |
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRetryHandler.class); | ||
|
||
private final BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition; | ||
private final BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition; | ||
private final BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition; | ||
private final BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition; | ||
protected final BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. allow consumers to more easily extend this class and be able to reuse the existing logic |
||
protected final BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition; | ||
protected final BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition; | ||
protected final BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition; | ||
|
||
private final RetryOperation<?> queueRecoveryRetryOperation; | ||
private final RetryOperation<?> exchangeRecoveryRetryOperation; | ||
private final RetryOperation<?> bindingRecoveryRetryOperation; | ||
private final RetryOperation<?> consumerRecoveryRetryOperation; | ||
protected final RetryOperation<?> queueRecoveryRetryOperation; | ||
protected final RetryOperation<?> exchangeRecoveryRetryOperation; | ||
protected final RetryOperation<?> bindingRecoveryRetryOperation; | ||
protected final RetryOperation<?> consumerRecoveryRetryOperation; | ||
|
||
private final int retryAttempts; | ||
protected final int retryAttempts; | ||
|
||
private final BackoffPolicy backoffPolicy; | ||
protected final BackoffPolicy backoffPolicy; | ||
|
||
public DefaultRetryHandler(BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition, | ||
BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,19 +30,19 @@ | |
*/ | ||
public class TopologyRecoveryRetryHandlerBuilder { | ||
|
||
private BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition = (q, e) -> false; | ||
private BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition = (ex, e) -> false; | ||
private BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition = (b, e) -> false; | ||
private BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition = (c, e) -> false; | ||
protected BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition = (q, e) -> false; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. allow consumers to more easily extend this class and be able to reuse the existing logic |
||
protected BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition = (ex, e) -> false; | ||
protected BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition = (b, e) -> false; | ||
protected BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition = (c, e) -> false; | ||
|
||
private DefaultRetryHandler.RetryOperation<?> queueRecoveryRetryOperation = context -> null; | ||
private DefaultRetryHandler.RetryOperation<?> exchangeRecoveryRetryOperation = context -> null; | ||
private DefaultRetryHandler.RetryOperation<?> bindingRecoveryRetryOperation = context -> null; | ||
private DefaultRetryHandler.RetryOperation<?> consumerRecoveryRetryOperation = context -> null; | ||
protected DefaultRetryHandler.RetryOperation<?> queueRecoveryRetryOperation = context -> null; | ||
protected DefaultRetryHandler.RetryOperation<?> exchangeRecoveryRetryOperation = context -> null; | ||
protected DefaultRetryHandler.RetryOperation<?> bindingRecoveryRetryOperation = context -> null; | ||
protected DefaultRetryHandler.RetryOperation<?> consumerRecoveryRetryOperation = context -> null; | ||
|
||
private int retryAttempts = 2; | ||
protected int retryAttempts = 2; | ||
|
||
private BackoffPolicy backoffPolicy = nbAttempts -> { | ||
protected BackoffPolicy backoffPolicy = nbAttempts -> { | ||
}; | ||
|
||
public static TopologyRecoveryRetryHandlerBuilder builder() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,6 @@ | |
import com.rabbitmq.client.AMQP; | ||
import com.rabbitmq.client.ShutdownSignalException; | ||
import com.rabbitmq.utility.Utility; | ||
|
||
import java.util.function.BiPredicate; | ||
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryHandlerBuilder.builder; | ||
|
||
|
@@ -55,6 +54,18 @@ public abstract class TopologyRecoveryRetryLogic { | |
} | ||
return null; | ||
}; | ||
|
||
/** | ||
* Recover a queue | ||
*/ | ||
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_QUEUE = context -> { | ||
if (context.entity() instanceof RecordedQueue) { | ||
final RecordedQueue recordedQueue = context.queue(); | ||
AutorecoveringConnection connection = context.connection(); | ||
connection.recoverQueue(recordedQueue.getName(), recordedQueue, false); | ||
} | ||
return null; | ||
}; | ||
|
||
/** | ||
* Recover the destination queue of a binding. | ||
|
@@ -138,18 +149,52 @@ public abstract class TopologyRecoveryRetryLogic { | |
* Recover a consumer. | ||
*/ | ||
public static final DefaultRetryHandler.RetryOperation<String> RECOVER_CONSUMER = context -> context.consumer().recover(); | ||
|
||
/** | ||
* Recover earlier consumers that share the same channel as this retry context | ||
*/ | ||
public static final DefaultRetryHandler.RetryOperation<String> RECOVER_PREVIOUS_CONSUMERS = context -> { | ||
if (context.entity() instanceof RecordedConsumer) { | ||
// recover all consumers for the same channel that were recovered before this current | ||
// consumer. need to do this incase some consumers had already been recovered | ||
// successfully on a different queue before this one failed | ||
final AutorecoveringChannel channel = context.consumer().getChannel(); | ||
for (RecordedConsumer consumer : Utility.copy(context.connection().getRecordedConsumers()).values()) { | ||
if (consumer == context.entity()) { | ||
break; | ||
} else if (consumer.getChannel() == channel) { | ||
final RetryContext retryContext = new RetryContext(consumer, context.exception(), context.connection()); | ||
RECOVER_CONSUMER_QUEUE.call(retryContext); | ||
consumer.recover(); | ||
RECOVER_CONSUMER_QUEUE_BINDINGS.call(retryContext); | ||
} | ||
} | ||
} | ||
return null; | ||
}; | ||
|
||
/** | ||
* Pre-configured {@link TopologyRecoveryRetryHandlerBuilder} that retries recovery of bindings and consumers | ||
* when their respective queue is not found. | ||
* | ||
* This retry handler can be useful for long recovery processes, whereby auto-delete queues | ||
* can be deleted between queue recovery and binding/consumer recovery. | ||
* | ||
* Also useful to retry channel-closed 404 errors that may arise with auto-delete queues during a cluster cycle. | ||
*/ | ||
public static final TopologyRecoveryRetryHandlerBuilder RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER = builder() | ||
.queueRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have seen channel-closed 404 "process is stopped by supervisor" errors during the recovery of a queue as well. |
||
.bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND) | ||
.consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND) | ||
.bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING) | ||
.queueRecoveryRetryOperation(RECOVER_CHANNEL | ||
.andThen(RECOVER_QUEUE)) | ||
.bindingRecoveryRetryOperation(RECOVER_CHANNEL | ||
.andThen(RECOVER_BINDING_QUEUE) | ||
.andThen(RECOVER_BINDING) | ||
.andThen(RECOVER_PREVIOUS_QUEUE_BINDINGS)) | ||
.consumerRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_CONSUMER_QUEUE.andThen(RECOVER_CONSUMER) | ||
.andThen(RECOVER_CONSUMER_QUEUE_BINDINGS))); | ||
.consumerRecoveryRetryOperation(RECOVER_CHANNEL | ||
.andThen(RECOVER_CONSUMER_QUEUE) | ||
.andThen(RECOVER_CONSUMER) | ||
.andThen(RECOVER_CONSUMER_QUEUE_BINDINGS) | ||
.andThen(RECOVER_PREVIOUS_CONSUMERS)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. recover consumers tied to the same channel that already recovered successfully, but are now closed due to the current failed consumer killing the channel out from under it. |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
useful in scenarios where a consumer is canceled by the server, and we want to remove it from the client side recovery