-
Notifications
You must be signed in to change notification settings - Fork 582
Recovery related improvements #677
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Recovery related improvements #677
Conversation
* Delete the recorded consumer from this channel and accompanying connection | ||
* @param consumerTag consumer tag to delete | ||
*/ | ||
public void deleteRecordedConsumer(String consumerTag) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
useful in scenarios where a consumer is canceled by the server, and we want to remove it from the client side recovery
private final BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition; | ||
private final BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition; | ||
private final BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition; | ||
protected final BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
allow consumers to more easily extend this class and be able to reuse the existing logic
private BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition = (ex, e) -> false; | ||
private BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition = (b, e) -> false; | ||
private BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition = (c, e) -> false; | ||
protected BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition = (q, e) -> false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
allow consumers to more easily extend this class and be able to reuse the existing logic
*/ | ||
public static final TopologyRecoveryRetryHandlerBuilder RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER = builder() | ||
.queueRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have seen channel-closed 404 "process is stopped by supervisor" errors during the recovery of a queue as well.
.andThen(RECOVER_CONSUMER_QUEUE) | ||
.andThen(RECOVER_CONSUMER) | ||
.andThen(RECOVER_CONSUMER_QUEUE_BINDINGS) | ||
.andThen(RECOVER_PREVIOUS_CONSUMERS)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recover consumers tied to the same channel that already recovered successfully, but are now closed due to the current failed consumer killing the channel out from under it.
This does not cherry-pick into |
yep i can do that. Thanks! |
Proposed Changes
Quality of life improvements to topology recovery to make it easier for consumers to extend the existing recovery retry logic or build their own.
This adds the RecordedEntity to the TopologyRecoveryException and then exposes a method on AutorecoveringConnection to recover a channel and any recorded entities that are tied to that channel. The thought being that I can capture all of the recorded entities and the associated channels that failed via ExceptionHandler.handleTopologyRecoveryException(). And then after the amqp-client's auto-recovery is complete, build my own logic to retry recovery on each channel again via AutorecoveringConnection.recoverChannelAndTopology().
The existing topology recovery retry design works pretty well, but has some holes with how we are using it.
For example:
In this scenario, the TopologyRecoveryRetryLogic recovers the queue/binding/consumer that failed, but isn't handling any upstream exchanges and exchange bindings on the queue in question that were deleted when the auto-delete queue was.
I considered trying address this scenario in the existing TopologyRecoveryRetryLogic patterns, but it feels safer and cleaner to take a different approach where I just always recover all entities tied to the failed channel.
The new AutorecoveringConnection.recoverChannelAndTopology() method is also useful for scenarios where a channel is closed that may not be tied to a connection failure.
Background of what drove these changes:
During RabbitMQ cluster restarts or upgrade in TAS, we have been struggling with high counts of errors such as:
during recovery of auto-delete queues (and their bindings/consumers). (We have a support ticket currently open with Pivotal for this). We are using the existing recovery retry logic to help address, but sometimes it will fail 5+ times (with multiple second sleeps in between) before finally succeeding.
Longer term, we plan on evaluating no longer using auto-delete queues, and instead preferring manual deletion and an expiry policy to cleanup abandoned ones.
Types of Changes
What types of changes does your code introduce to this project?
Put an
x
in the boxes that applyChecklist
Put an
x
in the boxes that apply. You can also fill these out after creatingthe PR. If you're unsure about any of them, don't hesitate to ask on the
mailing list. We're here to help! This is simply a reminder of what we are
going to look for before merging your code.
CONTRIBUTING.md
documentFurther Comments
If this is a relatively large or complex change, kick off the discussion by
explaining why you chose the solution you did and what alternatives you
considered, etc.