Skip to content

Add topology recovery started method to RecoveryListener #668

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/main/java/com/rabbitmq/client/RecoveryListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,12 @@ public interface RecoveryListener {
* @param recoverable a {@link Recoverable} connection.
*/
void handleRecoveryStarted(Recoverable recoverable);

/**
* Invoked before automatic topology recovery starts.
* This means that the connection and channel recovery has completed
* and that exchange/queue/binding/consumer recovery is about to begin.
* @param recoverable a {@link Recoverable} connection.
*/
default void handleTopologyRecoveryStarted(Recoverable recoverable) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,10 @@ public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) {
}

private synchronized void beginAutomaticRecovery() throws InterruptedException {
this.wait(this.params.getRecoveryDelayHandler().getDelay(0));
final long delay = this.params.getRecoveryDelayHandler().getDelay(0);
if (delay > 0) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i had tried to use a backoff recovery delay handler that had an initial delay of 0 and quickly realized that recovery would hang forever here. An easy work around is to use a 1ms initial delay, but this feels like a better solution that may save someone else in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is reasonable if you ask me.

this.wait(delay);
}

this.notifyRecoveryListenersStarted();

Expand All @@ -576,6 +579,7 @@ private synchronized void beginAutomaticRecovery() throws InterruptedException {
// don't assign new delegate connection until channel recovery is complete
this.delegate = newConn;
if (this.params.isTopologyRecoveryEnabled()) {
notifyTopologyRecoveryListenersStarted();
recoverTopology(params.getTopologyRecoveryExecutor());
}
this.notifyRecoveryListenersComplete();
Expand Down Expand Up @@ -650,6 +654,12 @@ private void notifyRecoveryListenersStarted() {
}
}

private void notifyTopologyRecoveryListenersStarted() {
for (RecoveryListener f : Utility.copy(this.recoveryListeners)) {
f.handleTopologyRecoveryStarted(this);
}
}

private void recoverTopology(final ExecutorService executor) {
// The recovery sequence is the following:
// 1. Recover exchanges
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public String getPassword() {
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/135
@Test public void thatShutdownHooksOnConnectionFireBeforeRecoveryStarts() throws IOException, InterruptedException {
final List<String> events = new CopyOnWriteArrayList<String>();
final CountDownLatch latch = new CountDownLatch(2); // one when started, another when complete
final CountDownLatch latch = new CountDownLatch(3); // one when started, another when complete
connection.addShutdownListener(new ShutdownListener() {
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
Expand Down Expand Up @@ -202,6 +202,10 @@ public void handleRecovery(Recoverable recoverable) {
public void handleRecoveryStarted(Recoverable recoverable) {
latch.countDown();
}
@Override
public void handleTopologyRecoveryStarted(Recoverable recoverable) {
latch.countDown();
}
});
assertThat(connection.isOpen()).isTrue();
closeAndWaitForRecovery();
Expand Down