Skip to content

Binding recovery retry should recover all bindings on the recovered queue #667

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,10 @@ public Map<String, RecordedExchange> getRecordedExchanges() {
public List<RecordedBinding> getRecordedBindings() {
return recordedBindings;
}

public Map<String, RecordedConsumer> getRecordedConsumers() {
return consumers;
}

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,27 @@ public abstract class TopologyRecoveryRetryLogic {
context.binding().recover();
return null;
};

/**
* Recover earlier bindings that share the same queue as this retry context
*/
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_PREVIOUS_QUEUE_BINDINGS = context -> {
if (context.entity() instanceof RecordedQueueBinding) {
// recover all bindings for the same queue that were recovered before this current binding
// need to do this incase some bindings had already been recovered successfully before the queue was deleted & this binding failed
String queue = context.binding().getDestination();
for (RecordedBinding recordedBinding : Utility.copy(context.connection().getRecordedBindings())) {
if (recordedBinding == context.entity()) {
// we have gotten to the binding in this context. Since this is an ordered list we can now break
// as we know we have recovered all the earlier bindings that may have existed on this queue
break;
} else if (recordedBinding instanceof RecordedQueueBinding && queue.equals(recordedBinding.getDestination())) {
recordedBinding.recover();
}
}
}
return null;
};

/**
* Recover the queue of a consumer.
Expand Down Expand Up @@ -127,7 +148,8 @@ public abstract class TopologyRecoveryRetryLogic {
public static final TopologyRecoveryRetryHandlerBuilder RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER = builder()
.bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
.consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
.bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING))
.bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING)
.andThen(RECOVER_PREVIOUS_QUEUE_BINDINGS))
.consumerRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_CONSUMER_QUEUE.andThen(RECOVER_CONSUMER)
.andThen(RECOVER_CONSUMER_QUEUE_BINDINGS)));
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,24 @@

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.RecoveryListener;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
import com.rabbitmq.client.impl.recovery.RecordedBinding;
import com.rabbitmq.client.impl.recovery.RecordedConsumer;
import com.rabbitmq.client.test.BrokerTestCase;
import com.rabbitmq.client.test.TestUtils;
import com.rabbitmq.tools.Host;
import org.junit.After;
import org.junit.Test;

import java.io.IOException;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER;
import static com.rabbitmq.client.test.TestUtils.closeAllConnectionsAndWaitForRecovery;
Expand All @@ -32,6 +45,13 @@
*/
public class TopologyRecoveryRetry extends BrokerTestCase {

private volatile Consumer<Integer> backoffConsumer;

@After
public void cleanup() {
backoffConsumer = null;
}

@Test
public void topologyRecoveryRetry() throws Exception {
int nbQueues = 200;
Expand All @@ -40,18 +60,149 @@ public void topologyRecoveryRetry() throws Exception {
String queue = prefix + i;
channel.queueDeclare(queue, false, false, true, new HashMap<>());
channel.queueBind(queue, "amq.direct", queue);
channel.queueBind(queue, "amq.direct", queue + "2");
channel.basicConsume(queue, true, new DefaultConsumer(channel));
}

closeAllConnectionsAndWaitForRecovery(this.connection);

assertTrue(channel.isOpen());
}

@Test
public void topologyRecoveryBindingFailure() throws Exception {
final String queue = "topology-recovery-retry-binding-failure" + System.currentTimeMillis();
channel.queueDeclare(queue, false, false, true, new HashMap<>());
channel.queueBind(queue, "amq.topic", "topic1");
channel.queueBind(queue, "amq.topic", "topic2");
final CountDownLatch messagesReceivedLatch = new CountDownLatch(2);
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
System.out.println("Got message=" + new String(body));
messagesReceivedLatch.countDown();
}
});
final CountDownLatch recoveryLatch = new CountDownLatch(1);
((AutorecoveringConnection)connection).addRecoveryListener(new RecoveryListener() {
@Override
public void handleRecoveryStarted(Recoverable recoverable) {
// no-op
}
@Override
public void handleRecovery(Recoverable recoverable) {
recoveryLatch.countDown();
}
});

// we want recovery to fail when recovering the 2nd binding
// give the 2nd recorded binding a bad queue name so it fails
final RecordedBinding binding2 = ((AutorecoveringConnection)connection).getRecordedBindings().get(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is brutal :D

binding2.destination(UUID.randomUUID().toString());

// use the backoffConsumer to know that it has failed
// then delete the real queue & fix the recorded binding
// it should fail once more because queue is gone, and then succeed
final CountDownLatch backoffLatch = new CountDownLatch(1);
backoffConsumer = attempt -> {
if (attempt == 1) {
binding2.destination(queue);
try {
Host.rabbitmqctl("delete_queue " + queue);
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
backoffLatch.countDown();
};

// close connection
Host.closeAllConnections();

// assert backoff was called
assertTrue(backoffLatch.await(90, TimeUnit.SECONDS));
// wait for full recovery
assertTrue(recoveryLatch.await(90, TimeUnit.SECONDS));

// publish messages to verify both bindings were recovered
basicPublishVolatile("test1".getBytes(), "amq.topic", "topic1");
basicPublishVolatile("test2".getBytes(), "amq.topic", "topic2");

assertTrue(messagesReceivedLatch.await(10, TimeUnit.SECONDS));
}

@Test
public void topologyRecoveryConsumerFailure() throws Exception {
final String queue = "topology-recovery-retry-consumer-failure" + System.currentTimeMillis();
channel.queueDeclare(queue, false, false, true, new HashMap<>());
channel.queueBind(queue, "amq.topic", "topic1");
channel.queueBind(queue, "amq.topic", "topic2");
final CountDownLatch messagesReceivedLatch = new CountDownLatch(2);
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
System.out.println("Got message=" + new String(body));
messagesReceivedLatch.countDown();
}
});
final CountDownLatch recoveryLatch = new CountDownLatch(1);
((AutorecoveringConnection)connection).addRecoveryListener(new RecoveryListener() {
@Override
public void handleRecoveryStarted(Recoverable recoverable) {
// no-op
}
@Override
public void handleRecovery(Recoverable recoverable) {
recoveryLatch.countDown();
}
});

// we want recovery to fail when recovering the consumer
// give the recorded consumer a bad queue name so it fails
final RecordedConsumer consumer = ((AutorecoveringConnection)connection).getRecordedConsumers().values().iterator().next();
consumer.setQueue(UUID.randomUUID().toString());

// use the backoffConsumer to know that it has failed
// then delete the real queue & fix the recorded consumer
// it should fail once more because queue is gone, and then succeed
final CountDownLatch backoffLatch = new CountDownLatch(1);
backoffConsumer = attempt -> {
if (attempt == 1) {
consumer.setQueue(queue);
try {
Host.rabbitmqctl("delete_queue " + queue);
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
backoffLatch.countDown();
};

// close connection
Host.closeAllConnections();

// assert backoff was called
assertTrue(backoffLatch.await(90, TimeUnit.SECONDS));
// wait for full recovery
assertTrue(recoveryLatch.await(90, TimeUnit.SECONDS));

// publish messages to verify both bindings & consumer were recovered
basicPublishVolatile("test1".getBytes(), "amq.topic", "topic1");
basicPublishVolatile("test2".getBytes(), "amq.topic", "topic2");

assertTrue(messagesReceivedLatch.await(10, TimeUnit.SECONDS));
}

@Override
protected ConnectionFactory newConnectionFactory() {
ConnectionFactory connectionFactory = TestUtils.connectionFactory();
connectionFactory.setTopologyRecoveryRetryHandler(RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER.build());
connectionFactory.setTopologyRecoveryRetryHandler(RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER.backoffPolicy(attempt -> {
if (backoffConsumer != null) {
backoffConsumer.accept(attempt);
}
}).build());
connectionFactory.setNetworkRecoveryInterval(1000);
return connectionFactory;
}
Expand Down