Skip to content

Commit 4cd1e64

Browse files
authored
Merge pull request #667 from vikinghawk/5.x.x-stable
Binding recovery retry should recover all bindings on the recovered queue
2 parents 277a8d2 + 30447a7 commit 4cd1e64

File tree

3 files changed

+180
-3
lines changed

3 files changed

+180
-3
lines changed

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1101,6 +1101,10 @@ public Map<String, RecordedExchange> getRecordedExchanges() {
11011101
public List<RecordedBinding> getRecordedBindings() {
11021102
return recordedBindings;
11031103
}
1104+
1105+
public Map<String, RecordedConsumer> getRecordedConsumers() {
1106+
return consumers;
1107+
}
11041108

11051109
@Override
11061110
public String toString() {

src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,27 @@ public abstract class TopologyRecoveryRetryLogic {
8080
context.binding().recover();
8181
return null;
8282
};
83+
84+
/**
85+
* Recover earlier bindings that share the same queue as this retry context
86+
*/
87+
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_PREVIOUS_QUEUE_BINDINGS = context -> {
88+
if (context.entity() instanceof RecordedQueueBinding) {
89+
// recover all bindings for the same queue that were recovered before this current binding
90+
// need to do this incase some bindings had already been recovered successfully before the queue was deleted & this binding failed
91+
String queue = context.binding().getDestination();
92+
for (RecordedBinding recordedBinding : Utility.copy(context.connection().getRecordedBindings())) {
93+
if (recordedBinding == context.entity()) {
94+
// we have gotten to the binding in this context. Since this is an ordered list we can now break
95+
// as we know we have recovered all the earlier bindings that may have existed on this queue
96+
break;
97+
} else if (recordedBinding instanceof RecordedQueueBinding && queue.equals(recordedBinding.getDestination())) {
98+
recordedBinding.recover();
99+
}
100+
}
101+
}
102+
return null;
103+
};
83104

84105
/**
85106
* Recover the queue of a consumer.
@@ -127,7 +148,8 @@ public abstract class TopologyRecoveryRetryLogic {
127148
public static final TopologyRecoveryRetryHandlerBuilder RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER = builder()
128149
.bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
129150
.consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
130-
.bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING))
151+
.bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING)
152+
.andThen(RECOVER_PREVIOUS_QUEUE_BINDINGS))
131153
.consumerRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_CONSUMER_QUEUE.andThen(RECOVER_CONSUMER)
132154
.andThen(RECOVER_CONSUMER_QUEUE_BINDINGS)));
133155
}

src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java

Lines changed: 153 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,24 @@
1717

1818
import com.rabbitmq.client.ConnectionFactory;
1919
import com.rabbitmq.client.DefaultConsumer;
20+
import com.rabbitmq.client.Envelope;
21+
import com.rabbitmq.client.Recoverable;
22+
import com.rabbitmq.client.RecoveryListener;
23+
import com.rabbitmq.client.AMQP.BasicProperties;
24+
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
25+
import com.rabbitmq.client.impl.recovery.RecordedBinding;
26+
import com.rabbitmq.client.impl.recovery.RecordedConsumer;
2027
import com.rabbitmq.client.test.BrokerTestCase;
2128
import com.rabbitmq.client.test.TestUtils;
29+
import com.rabbitmq.tools.Host;
30+
import org.junit.After;
2231
import org.junit.Test;
23-
32+
import java.io.IOException;
2433
import java.util.HashMap;
34+
import java.util.UUID;
35+
import java.util.concurrent.CountDownLatch;
36+
import java.util.concurrent.TimeUnit;
37+
import java.util.function.Consumer;
2538

2639
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER;
2740
import static com.rabbitmq.client.test.TestUtils.closeAllConnectionsAndWaitForRecovery;
@@ -32,6 +45,13 @@
3245
*/
3346
public class TopologyRecoveryRetry extends BrokerTestCase {
3447

48+
private volatile Consumer<Integer> backoffConsumer;
49+
50+
@After
51+
public void cleanup() {
52+
backoffConsumer = null;
53+
}
54+
3555
@Test
3656
public void topologyRecoveryRetry() throws Exception {
3757
int nbQueues = 200;
@@ -40,18 +60,149 @@ public void topologyRecoveryRetry() throws Exception {
4060
String queue = prefix + i;
4161
channel.queueDeclare(queue, false, false, true, new HashMap<>());
4262
channel.queueBind(queue, "amq.direct", queue);
63+
channel.queueBind(queue, "amq.direct", queue + "2");
4364
channel.basicConsume(queue, true, new DefaultConsumer(channel));
4465
}
4566

4667
closeAllConnectionsAndWaitForRecovery(this.connection);
4768

4869
assertTrue(channel.isOpen());
4970
}
71+
72+
@Test
73+
public void topologyRecoveryBindingFailure() throws Exception {
74+
final String queue = "topology-recovery-retry-binding-failure" + System.currentTimeMillis();
75+
channel.queueDeclare(queue, false, false, true, new HashMap<>());
76+
channel.queueBind(queue, "amq.topic", "topic1");
77+
channel.queueBind(queue, "amq.topic", "topic2");
78+
final CountDownLatch messagesReceivedLatch = new CountDownLatch(2);
79+
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
80+
@Override
81+
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
82+
System.out.println("Got message=" + new String(body));
83+
messagesReceivedLatch.countDown();
84+
}
85+
});
86+
final CountDownLatch recoveryLatch = new CountDownLatch(1);
87+
((AutorecoveringConnection)connection).addRecoveryListener(new RecoveryListener() {
88+
@Override
89+
public void handleRecoveryStarted(Recoverable recoverable) {
90+
// no-op
91+
}
92+
@Override
93+
public void handleRecovery(Recoverable recoverable) {
94+
recoveryLatch.countDown();
95+
}
96+
});
97+
98+
// we want recovery to fail when recovering the 2nd binding
99+
// give the 2nd recorded binding a bad queue name so it fails
100+
final RecordedBinding binding2 = ((AutorecoveringConnection)connection).getRecordedBindings().get(1);
101+
binding2.destination(UUID.randomUUID().toString());
102+
103+
// use the backoffConsumer to know that it has failed
104+
// then delete the real queue & fix the recorded binding
105+
// it should fail once more because queue is gone, and then succeed
106+
final CountDownLatch backoffLatch = new CountDownLatch(1);
107+
backoffConsumer = attempt -> {
108+
if (attempt == 1) {
109+
binding2.destination(queue);
110+
try {
111+
Host.rabbitmqctl("delete_queue " + queue);
112+
Thread.sleep(2000);
113+
} catch (Exception e) {
114+
e.printStackTrace();
115+
}
116+
}
117+
backoffLatch.countDown();
118+
};
119+
120+
// close connection
121+
Host.closeAllConnections();
122+
123+
// assert backoff was called
124+
assertTrue(backoffLatch.await(90, TimeUnit.SECONDS));
125+
// wait for full recovery
126+
assertTrue(recoveryLatch.await(90, TimeUnit.SECONDS));
127+
128+
// publish messages to verify both bindings were recovered
129+
basicPublishVolatile("test1".getBytes(), "amq.topic", "topic1");
130+
basicPublishVolatile("test2".getBytes(), "amq.topic", "topic2");
131+
132+
assertTrue(messagesReceivedLatch.await(10, TimeUnit.SECONDS));
133+
}
134+
135+
@Test
136+
public void topologyRecoveryConsumerFailure() throws Exception {
137+
final String queue = "topology-recovery-retry-consumer-failure" + System.currentTimeMillis();
138+
channel.queueDeclare(queue, false, false, true, new HashMap<>());
139+
channel.queueBind(queue, "amq.topic", "topic1");
140+
channel.queueBind(queue, "amq.topic", "topic2");
141+
final CountDownLatch messagesReceivedLatch = new CountDownLatch(2);
142+
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
143+
@Override
144+
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
145+
System.out.println("Got message=" + new String(body));
146+
messagesReceivedLatch.countDown();
147+
}
148+
});
149+
final CountDownLatch recoveryLatch = new CountDownLatch(1);
150+
((AutorecoveringConnection)connection).addRecoveryListener(new RecoveryListener() {
151+
@Override
152+
public void handleRecoveryStarted(Recoverable recoverable) {
153+
// no-op
154+
}
155+
@Override
156+
public void handleRecovery(Recoverable recoverable) {
157+
recoveryLatch.countDown();
158+
}
159+
});
160+
161+
// we want recovery to fail when recovering the consumer
162+
// give the recorded consumer a bad queue name so it fails
163+
final RecordedConsumer consumer = ((AutorecoveringConnection)connection).getRecordedConsumers().values().iterator().next();
164+
consumer.setQueue(UUID.randomUUID().toString());
165+
166+
// use the backoffConsumer to know that it has failed
167+
// then delete the real queue & fix the recorded consumer
168+
// it should fail once more because queue is gone, and then succeed
169+
final CountDownLatch backoffLatch = new CountDownLatch(1);
170+
backoffConsumer = attempt -> {
171+
if (attempt == 1) {
172+
consumer.setQueue(queue);
173+
try {
174+
Host.rabbitmqctl("delete_queue " + queue);
175+
Thread.sleep(2000);
176+
} catch (Exception e) {
177+
e.printStackTrace();
178+
}
179+
}
180+
backoffLatch.countDown();
181+
};
182+
183+
// close connection
184+
Host.closeAllConnections();
185+
186+
// assert backoff was called
187+
assertTrue(backoffLatch.await(90, TimeUnit.SECONDS));
188+
// wait for full recovery
189+
assertTrue(recoveryLatch.await(90, TimeUnit.SECONDS));
190+
191+
// publish messages to verify both bindings & consumer were recovered
192+
basicPublishVolatile("test1".getBytes(), "amq.topic", "topic1");
193+
basicPublishVolatile("test2".getBytes(), "amq.topic", "topic2");
194+
195+
assertTrue(messagesReceivedLatch.await(10, TimeUnit.SECONDS));
196+
}
50197

51198
@Override
52199
protected ConnectionFactory newConnectionFactory() {
53200
ConnectionFactory connectionFactory = TestUtils.connectionFactory();
54-
connectionFactory.setTopologyRecoveryRetryHandler(RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER.build());
201+
connectionFactory.setTopologyRecoveryRetryHandler(RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER.backoffPolicy(attempt -> {
202+
if (backoffConsumer != null) {
203+
backoffConsumer.accept(attempt);
204+
}
205+
}).build());
55206
connectionFactory.setNetworkRecoveryInterval(1000);
56207
return connectionFactory;
57208
}

0 commit comments

Comments
 (0)