From b3b1bfb3e2a29674cc9d6144baeef1a3f0058c07 Mon Sep 17 00:00:00 2001 From: Heesung Sohn <103456639+heesung-sn@users.noreply.github.com> Date: Wed, 28 Feb 2024 18:18:22 -0800 Subject: [PATCH] [fix][broker][branch-3.0] Return getOwnerAsync without waiting on source broker upon Assigning and Releasing and handle role change during role init (#22112) (#22156) --- .../extensions/ExtensibleLoadManagerImpl.java | 24 ++++ .../channel/ServiceUnitStateChannelImpl.java | 11 +- .../ExtensibleLoadManagerImplTest.java | 128 ++++++++++++++---- .../channel/ServiceUnitStateChannelTest.java | 77 +++++++++-- .../ExtensibleLoadManagerTest.java | 35 +++-- 5 files changed, 224 insertions(+), 51 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 409bb55075be0..6a0e677c66268 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -777,8 +777,13 @@ synchronized void playLeader() { log.info("This broker:{} is setting the role from {} to {}", pulsar.getBrokerId(), role, Leader); int retry = 0; + boolean becameFollower = false; while (!Thread.currentThread().isInterrupted()) { try { + if (!serviceUnitStateChannel.isChannelOwner()) { + becameFollower = true; + break; + } initWaiter.await(); // Confirm the system topics have been created or create them if they do not exist. // If the leader has changed, the new leader need to reset @@ -802,6 +807,13 @@ synchronized void playLeader() { } } } + + if (becameFollower) { + log.warn("The broker:{} became follower while initializing leader role.", pulsar.getBrokerId()); + playFollower(); + return; + } + role = Leader; log.info("This broker:{} plays the leader now.", pulsar.getBrokerId()); @@ -815,8 +827,13 @@ synchronized void playFollower() { log.info("This broker:{} is setting the role from {} to {}", pulsar.getBrokerId(), role, Follower); int retry = 0; + boolean becameLeader = false; while (!Thread.currentThread().isInterrupted()) { try { + if (serviceUnitStateChannel.isChannelOwner()) { + becameLeader = true; + break; + } initWaiter.await(); unloadScheduler.close(); serviceUnitStateChannel.cancelOwnershipMonitor(); @@ -836,6 +853,13 @@ synchronized void playFollower() { } } } + + if (becameLeader) { + log.warn("This broker:{} became leader while initializing follower role.", pulsar.getBrokerId()); + playLeader(); + return; + } + role = Follower; log.info("This broker:{} plays a follower now.", pulsar.getBrokerId()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 220ce02ba5aef..f66ed2a5c9062 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -533,7 +533,16 @@ public CompletableFuture> getOwnerAsync(String serviceUnit) { return getActiveOwnerAsync(serviceUnit, state, Optional.of(data.sourceBroker())); } case Assigning, Releasing -> { - return getActiveOwnerAsync(serviceUnit, state, Optional.empty()); + if (isTargetBroker(data.dstBroker())) { + return getActiveOwnerAsync(serviceUnit, state, Optional.of(data.dstBroker())); + } + // If this broker is not the dst broker, return the dst broker as the owner(or empty). + // Clients need to connect(redirect) to the dst broker anyway + // and wait for the dst broker to receive `Owned`. + // This is also required to return getOwnerAsync on the src broker immediately during unloading. + // Otherwise, topic creation(getOwnerAsync) could block unloading bundles, + // if the topic creation(getOwnerAsync) happens during unloading on the src broker. + return CompletableFuture.completedFuture(Optional.ofNullable(data.dstBroker())); } case Init, Free -> { return CompletableFuture.completedFuture(Optional.empty()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index e87532cdc6ce2..43b84a5426f3c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.loadbalance.extensions; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Releasing; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelTest.overrideTableView; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Bandwidth; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.MsgRate; @@ -70,6 +72,7 @@ import java.util.Map; import java.util.Optional; import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -78,6 +81,7 @@ import org.apache.pulsar.broker.loadbalance.LeaderElectionService; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData; @@ -95,12 +99,14 @@ import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener; import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.TableViewImpl; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.TopicVersion; import org.apache.pulsar.common.policies.data.BrokerAssignment; @@ -775,7 +781,6 @@ public void testRoleChange() throws Exception { reset(); return null; }).when(topBundlesLoadDataStorePrimarySpy).closeTableView(); - FieldUtils.writeDeclaredField(primaryLoadManager, "topBundlesLoadDataStore", topBundlesLoadDataStorePrimarySpy, true); var topBundlesLoadDataStoreSecondary = (LoadDataStore) FieldUtils.readDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", true); @@ -798,36 +803,65 @@ public void testRoleChange() throws Exception { reset(); return null; }).when(topBundlesLoadDataStoreSecondarySpy).closeTableView(); - FieldUtils.writeDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", topBundlesLoadDataStoreSecondarySpy, true); - if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) { - primaryLoadManager.playFollower(); // close 3 times - primaryLoadManager.playFollower(); // close 1 time - secondaryLoadManager.playLeader(); - secondaryLoadManager.playLeader(); - primaryLoadManager.playLeader(); // close 3 times and open 3 times - primaryLoadManager.playLeader(); // close 1 time and open 1 time, - secondaryLoadManager.playFollower(); - secondaryLoadManager.playFollower(); - } else { - primaryLoadManager.playLeader(); - primaryLoadManager.playLeader(); - secondaryLoadManager.playFollower(); - secondaryLoadManager.playFollower(); - primaryLoadManager.playFollower(); + try { + FieldUtils.writeDeclaredField(primaryLoadManager, "topBundlesLoadDataStore", + topBundlesLoadDataStorePrimarySpy, true); + FieldUtils.writeDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", + topBundlesLoadDataStoreSecondarySpy, true); + + + if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) { + primaryLoadManager.playLeader(); + secondaryLoadManager.playFollower(); + verify(topBundlesLoadDataStorePrimarySpy, times(3)).startTableView(); + verify(topBundlesLoadDataStorePrimarySpy, times(5)).closeTableView(); + verify(topBundlesLoadDataStoreSecondarySpy, times(0)).startTableView(); + verify(topBundlesLoadDataStoreSecondarySpy, times(3)).closeTableView(); + } else { + primaryLoadManager.playFollower(); + secondaryLoadManager.playLeader(); + verify(topBundlesLoadDataStoreSecondarySpy, times(3)).startTableView(); + verify(topBundlesLoadDataStoreSecondarySpy, times(5)).closeTableView(); + verify(topBundlesLoadDataStorePrimarySpy, times(0)).startTableView(); + verify(topBundlesLoadDataStorePrimarySpy, times(3)).closeTableView(); + } + primaryLoadManager.playFollower(); - secondaryLoadManager.playLeader(); - secondaryLoadManager.playLeader(); - } + secondaryLoadManager.playFollower(); + if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) { + assertEquals(ExtensibleLoadManagerImpl.Role.Leader, + FieldUtils.readDeclaredField(primaryLoadManager, "role", true)); + assertEquals(ExtensibleLoadManagerImpl.Role.Follower, + FieldUtils.readDeclaredField(secondaryLoadManager, "role", true)); + } else { + assertEquals(ExtensibleLoadManagerImpl.Role.Follower, + FieldUtils.readDeclaredField(primaryLoadManager, "role", true)); + assertEquals(ExtensibleLoadManagerImpl.Role.Leader, + FieldUtils.readDeclaredField(secondaryLoadManager, "role", true)); + } - verify(topBundlesLoadDataStorePrimarySpy, times(4)).startTableView(); - verify(topBundlesLoadDataStorePrimarySpy, times(8)).closeTableView(); - verify(topBundlesLoadDataStoreSecondarySpy, times(4)).startTableView(); - verify(topBundlesLoadDataStoreSecondarySpy, times(8)).closeTableView(); + primaryLoadManager.playLeader(); + secondaryLoadManager.playLeader(); - FieldUtils.writeDeclaredField(primaryLoadManager, "topBundlesLoadDataStore", topBundlesLoadDataStorePrimary, true); - FieldUtils.writeDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", topBundlesLoadDataStoreSecondary, true); + if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) { + assertEquals(ExtensibleLoadManagerImpl.Role.Leader, + FieldUtils.readDeclaredField(primaryLoadManager, "role", true)); + assertEquals(ExtensibleLoadManagerImpl.Role.Follower, + FieldUtils.readDeclaredField(secondaryLoadManager, "role", true)); + } else { + assertEquals(ExtensibleLoadManagerImpl.Role.Follower, + FieldUtils.readDeclaredField(primaryLoadManager, "role", true)); + assertEquals(ExtensibleLoadManagerImpl.Role.Leader, + FieldUtils.readDeclaredField(secondaryLoadManager, "role", true)); + } + } finally { + FieldUtils.writeDeclaredField(primaryLoadManager, "topBundlesLoadDataStore", + topBundlesLoadDataStorePrimary, true); + FieldUtils.writeDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", + topBundlesLoadDataStoreSecondary, true); + } } @Test @@ -1233,6 +1267,32 @@ public void compactionScheduleTest() { }); } + @Test(timeOut = 10 * 1000) + public void unloadTimeoutCheckTest() + throws Exception { + Pair topicAndBundle = getBundleIsNotOwnByChangeEventTopic("unload-timeout"); + String topic = topicAndBundle.getLeft().toString(); + var bundle = topicAndBundle.getRight().toString(); + var releasing = new ServiceUnitStateData(Releasing, pulsar2.getBrokerId(), pulsar1.getBrokerId(), 1); + overrideTableView(channel1, bundle, releasing); + var topicFuture = pulsar1.getBrokerService().getOrCreateTopic(topic); + + + try { + topicFuture.get(1, TimeUnit.SECONDS); + } catch (Exception e) { + log.info("getOrCreateTopic failed", e); + if (!(e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException && e.getMessage() + .contains("Please redo the lookup"))) { + fail(); + } + } + + pulsar1.getBrokerService() + .unloadServiceUnit(topicAndBundle.getRight(), true, 5, + TimeUnit.SECONDS).get(2, TimeUnit.SECONDS); + } + private static abstract class MockBrokerFilter implements BrokerFilter { @Override @@ -1265,4 +1325,20 @@ private void setSecondaryLoadManager() throws IllegalAccessException { private CompletableFuture getBundleAsync(PulsarService pulsar, TopicName topic) { return pulsar.getNamespaceService().getBundleAsync(topic); } + + private Pair getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix) + throws Exception { + TopicName changeEventsTopicName = + TopicName.get(defaultTestNamespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME); + NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1, changeEventsTopicName).get(); + int i = 0; + while (true) { + TopicName topicName = TopicName.get(defaultTestNamespace + "/" + topicNamePrefix + "-" + i); + NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get(); + if (!bundle.equals(changeEventsBundle)) { + return Pair.of(topicName, bundle); + } + i++; + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index 84988a3b3bf69..dc9844366771c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -484,19 +484,17 @@ public void transferTestWhenDestBrokerFails() var owner1 = channel1.getOwnerAsync(bundle); var owner2 = channel2.getOwnerAsync(bundle); - assertFalse(owner1.isDone()); + assertTrue(owner1.isDone()); + assertEquals(brokerId2, owner1.get().get()); assertFalse(owner2.isDone()); - assertEquals(1, getOwnerRequests1.size()); + assertEquals(0, getOwnerRequests1.size()); assertEquals(1, getOwnerRequests2.size()); // In 10 secs, the getOwnerAsync requests(lookup requests) should time out. - Awaitility.await().atMost(10, TimeUnit.SECONDS) - .untilAsserted(() -> assertTrue(owner1.isCompletedExceptionally())); Awaitility.await().atMost(10, TimeUnit.SECONDS) .untilAsserted(() -> assertTrue(owner2.isCompletedExceptionally())); - assertEquals(0, getOwnerRequests1.size()); assertEquals(0, getOwnerRequests2.size()); // recovered, check the monitor update state : Assigned -> Owned @@ -1133,12 +1131,10 @@ public void assignTestWhenDestBrokerProducerFails() var owner1 = channel1.getOwnerAsync(bundle); var owner2 = channel2.getOwnerAsync(bundle); - assertFalse(owner1.isDone()); + assertTrue(owner1.isDone()); assertFalse(owner2.isDone()); // In 10 secs, the getOwnerAsync requests(lookup requests) should time out. - Awaitility.await().atMost(10, TimeUnit.SECONDS) - .untilAsserted(() -> assertTrue(owner1.isCompletedExceptionally())); Awaitility.await().atMost(10, TimeUnit.SECONDS) .untilAsserted(() -> assertTrue(owner2.isCompletedExceptionally())); @@ -1317,6 +1313,68 @@ public void testIsOwner() throws IllegalAccessException { assertFalse(channel1.isOwner(bundle)); } + @Test(priority = 15) + public void testGetOwnerAsync() throws Exception { + + overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned, brokerId1, 1)); + var owner = channel1.getOwnerAsync(bundle); + assertTrue(owner.isDone()); + assertEquals(brokerId1, owner.get().get()); + + overrideTableView(channel1, bundle, new ServiceUnitStateData(Owned, brokerId2, 1)); + owner = channel1.getOwnerAsync(bundle); + assertTrue(owner.isDone()); + assertEquals(brokerId2, owner.get().get()); + + overrideTableView(channel1, bundle, new ServiceUnitStateData(Assigning, brokerId1, 1)); + owner = channel1.getOwnerAsync(bundle); + assertTrue(!owner.isDone()); + + overrideTableView(channel1, bundle, new ServiceUnitStateData(Assigning, brokerId2, 1)); + owner = channel1.getOwnerAsync(bundle); + assertTrue(owner.isDone()); + assertEquals(brokerId2, owner.get().get()); + + overrideTableView(channel1, bundle, new ServiceUnitStateData(Releasing, brokerId1, 1)); + owner = channel1.getOwnerAsync(bundle); + assertTrue(!owner.isDone()); + + overrideTableView(channel1, bundle, new ServiceUnitStateData(Releasing, brokerId2, 1)); + owner = channel1.getOwnerAsync(bundle); + assertTrue(owner.isDone()); + assertEquals(brokerId2, owner.get().get()); + + overrideTableView(channel1, bundle, new ServiceUnitStateData(Releasing, null, brokerId1, 1)); + owner = channel1.getOwnerAsync(bundle); + assertTrue(owner.isDone()); + assertEquals(Optional.empty(), owner.get()); + + overrideTableView(channel1, bundle, new ServiceUnitStateData(Splitting, null, brokerId1, 1)); + owner = channel1.getOwnerAsync(bundle); + assertTrue(owner.isDone()); + assertEquals(brokerId1, owner.get().get()); + + overrideTableView(channel1, bundle, new ServiceUnitStateData(Splitting, null, brokerId2, 1)); + owner = channel1.getOwnerAsync(bundle); + assertTrue(owner.isDone()); + assertEquals(brokerId2, owner.get().get()); + + overrideTableView(channel1, bundle, new ServiceUnitStateData(Free, null, brokerId1, 1)); + owner = channel1.getOwnerAsync(bundle); + assertTrue(owner.isDone()); + assertEquals(Optional.empty(), owner.get()); + + overrideTableView(channel1, bundle, new ServiceUnitStateData(Deleted, null, brokerId1, 1)); + owner = channel1.getOwnerAsync(bundle); + assertTrue(owner.isDone()); + assertTrue(owner.isCompletedExceptionally()); + + overrideTableView(channel1, bundle, null); + owner = channel1.getOwnerAsync(bundle); + assertTrue(owner.isDone()); + assertEquals(Optional.empty(), owner.get()); + } + @Test(priority = 16) public void splitAndRetryFailureTest() throws Exception { channel1.publishAssignEventAsync(bundle3, brokerId1); @@ -1775,7 +1833,8 @@ private void overrideTableViews(String serviceUnit, ServiceUnitStateData val) th overrideTableView(channel2, serviceUnit, val); } - private static void overrideTableView(ServiceUnitStateChannel channel, String serviceUnit, ServiceUnitStateData val) + @Test(enabled = false) + public static void overrideTableView(ServiceUnitStateChannel channel, String serviceUnit, ServiceUnitStateData val) throws IllegalAccessException { var tv = (TableViewImpl) FieldUtils.readField(channel, "tableview", true); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java index b9707ea76c352..af14ef97f85c3 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java @@ -324,8 +324,8 @@ public void testAntiaffinityPolicy() throws PulsarAdminException { assertEquals(result.size(), NUM_BROKERS); } - @Test(timeOut = 40 * 1000) - public void testIsolationPolicy() throws PulsarAdminException { + @Test(timeOut = 300 * 1000) + public void testIsolationPolicy() throws Exception { final String namespaceIsolationPolicyName = "my-isolation-policy"; final String isolationEnabledNameSpace = DEFAULT_TENANT + "/my-isolation-policy" + nsSuffix; Map parameters1 = new HashMap<>(); @@ -334,7 +334,8 @@ public void testIsolationPolicy() throws PulsarAdminException { Awaitility.await().atMost(10, TimeUnit.SECONDS).ignoreExceptions().untilAsserted( () -> { - List activeBrokers = admin.brokers().getActiveBrokers(); + List activeBrokers = admin.brokers().getActiveBrokersAsync() + .get(5, TimeUnit.SECONDS); assertEquals(activeBrokers.size(), NUM_BROKERS); } ); @@ -377,15 +378,16 @@ public void testIsolationPolicy() throws PulsarAdminException { } } - Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted( + Awaitility.await().atMost(60, TimeUnit.SECONDS).ignoreExceptions().untilAsserted( () -> { - List activeBrokers = admin.brokers().getActiveBrokers(); + List activeBrokers = admin.brokers().getActiveBrokersAsync() + .get(5, TimeUnit.SECONDS); assertEquals(activeBrokers.size(), 2); } ); Awaitility.await().atMost(60, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { - String ownerBroker = admin.lookups().lookupTopic(topic); + String ownerBroker = admin.lookups().lookupTopicAsync(topic).get(5, TimeUnit.SECONDS); assertEquals(extractBrokerIndex(ownerBroker), 1); }); @@ -396,20 +398,23 @@ public void testIsolationPolicy() throws PulsarAdminException { } } - Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted( + Awaitility.await().atMost(60, TimeUnit.SECONDS).ignoreExceptions().untilAsserted( () -> { - List activeBrokers = admin.brokers().getActiveBrokers(); + List activeBrokers = admin.brokers().getActiveBrokersAsync().get(5, TimeUnit.SECONDS); assertEquals(activeBrokers.size(), 1); } ); - try { - admin.lookups().lookupTopic(topic); - fail(); - } catch (Exception ex) { - log.error("Failed to lookup topic: ", ex); - assertThat(ex.getMessage()).contains("Failed to select the new owner broker for bundle"); - } + Awaitility.await().atMost(60, TimeUnit.SECONDS).ignoreExceptions().untilAsserted( + () -> { + try { + admin.lookups().lookupTopicAsync(topic).get(5, TimeUnit.SECONDS); + } catch (Exception ex) { + log.error("Failed to lookup topic: ", ex); + assertThat(ex.getMessage()).contains("Failed to select the new owner broker for bundle"); + } + } + ); } private String getBrokerUrl(int index) {