Skip to content

Recovery related improvements #677

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 1, 2021
Merged

Recovery related improvements #677

merged 1 commit into from
Apr 1, 2021

Conversation

vikinghawk
Copy link
Contributor

@vikinghawk vikinghawk commented Apr 1, 2021

Proposed Changes

Quality of life improvements to topology recovery to make it easier for consumers to extend the existing recovery retry logic or build their own.

This adds the RecordedEntity to the TopologyRecoveryException and then exposes a method on AutorecoveringConnection to recover a channel and any recorded entities that are tied to that channel. The thought being that I can capture all of the recorded entities and the associated channels that failed via ExceptionHandler.handleTopologyRecoveryException(). And then after the amqp-client's auto-recovery is complete, build my own logic to retry recovery on each channel again via AutorecoveringConnection.recoverChannelAndTopology().

The existing topology recovery retry design works pretty well, but has some holes with how we are using it.
For example:

  1. successfully recover multiple auto-delete exchanges for a channel
  2. successfully recover multiple auto-delete queues on same channel (that are bound to the exchanges in step 1)
  3. fail to recover a later queue, binding, or consumer on same channel

In this scenario, the TopologyRecoveryRetryLogic recovers the queue/binding/consumer that failed, but isn't handling any upstream exchanges and exchange bindings on the queue in question that were deleted when the auto-delete queue was.

I considered trying address this scenario in the existing TopologyRecoveryRetryLogic patterns, but it feels safer and cleaner to take a different approach where I just always recover all entities tied to the failed channel.

The new AutorecoveringConnection.recoverChannelAndTopology() method is also useful for scenarios where a channel is closed that may not be tied to a connection failure.

Background of what drove these changes:
During RabbitMQ cluster restarts or upgrade in TAS, we have been struggling with high counts of errors such as:

channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - queue '<queue>' in vhost '<vhost>' process is stopped by supervisor, class-id=50, method-id=10)

during recovery of auto-delete queues (and their bindings/consumers). (We have a support ticket currently open with Pivotal for this). We are using the existing recovery retry logic to help address, but sometimes it will fail 5+ times (with multiple second sleeps in between) before finally succeeding.

Longer term, we plan on evaluating no longer using auto-delete queues, and instead preferring manual deletion and an expiry policy to cleanup abandoned ones.

Types of Changes

What types of changes does your code introduce to this project?
Put an x in the boxes that apply

  • Bugfix (non-breaking change which fixes issue #NNNN)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation (correction or otherwise)
  • Cosmetics (whitespace, appearance)

Checklist

Put an x in the boxes that apply. You can also fill these out after creating
the PR. If you're unsure about any of them, don't hesitate to ask on the
mailing list. We're here to help! This is simply a reminder of what we are
going to look for before merging your code.

  • I have read the CONTRIBUTING.md document
  • I have signed the CA (see https://cla.pivotal.io/sign/rabbitmq)
  • All tests pass locally with my changes
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)
  • Any dependent changes have been merged and published in related repositories

Further Comments

If this is a relatively large or complex change, kick off the discussion by
explaining why you chose the solution you did and what alternatives you
considered, etc.

* Delete the recorded consumer from this channel and accompanying connection
* @param consumerTag consumer tag to delete
*/
public void deleteRecordedConsumer(String consumerTag) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useful in scenarios where a consumer is canceled by the server, and we want to remove it from the client side recovery

private final BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition;
private final BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition;
private final BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition;
protected final BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allow consumers to more easily extend this class and be able to reuse the existing logic

private BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition = (ex, e) -> false;
private BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition = (b, e) -> false;
private BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition = (c, e) -> false;
protected BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition = (q, e) -> false;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allow consumers to more easily extend this class and be able to reuse the existing logic

*/
public static final TopologyRecoveryRetryHandlerBuilder RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER = builder()
.queueRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have seen channel-closed 404 "process is stopped by supervisor" errors during the recovery of a queue as well.

.andThen(RECOVER_CONSUMER_QUEUE)
.andThen(RECOVER_CONSUMER)
.andThen(RECOVER_CONSUMER_QUEUE_BINDINGS)
.andThen(RECOVER_PREVIOUS_CONSUMERS));
Copy link
Contributor Author

@vikinghawk vikinghawk Apr 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recover consumers tied to the same channel that already recovered successfully, but are now closed due to the current failed consumer killing the channel out from under it.

@michaelklishin michaelklishin merged commit f27a954 into rabbitmq:5.x.x-stable Apr 1, 2021
@michaelklishin
Copy link
Contributor

This does not cherry-pick into master cleanly, @vikinghawk can you please submit an identical PR for master?

@vikinghawk
Copy link
Contributor Author

yep i can do that. Thanks!

@vikinghawk vikinghawk deleted the recovery_improvements branch April 1, 2021 22:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants