diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index c764283cb4459..836e565516804 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -959,10 +959,31 @@ public CompletableFuture checkClusterMigration() { consumer.topicMigrated(url); }); }); + return disconnectReplicators().thenCompose(__ -> checkAndUnsubscribeSubscriptions()); } return CompletableFuture.completedFuture(null); } + private CompletableFuture checkAndUnsubscribeSubscriptions() { + List> futures = new ArrayList<>(); + subscriptions.forEach((s, subscription) -> { + if (subscription.getConsumers().isEmpty()) { + futures.add(subscription.delete()); + } + }); + + return FutureUtil.waitForAll(futures); + } + + private CompletableFuture disconnectReplicators() { + List> futures = new ArrayList<>(); + ConcurrentOpenHashMap replicators = getReplicators(); + replicators.forEach((r, replicator) -> { + futures.add(replicator.disconnect()); + }); + return FutureUtil.waitForAll(futures); + } + @Override public void checkGC() { if (!isDeleteWhileInactive()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 57a4989b4d321..f5679665d46f0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -180,6 +180,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCallback { // Managed ledger associated with the topic @@ -2575,25 +2576,49 @@ private boolean hasBacklogs() { @Override public CompletableFuture checkClusterMigration() { Optional clusterUrl = getMigratedClusterUrl(); - if (!isMigrated() && clusterUrl.isPresent()) { - return ledger.asyncMigrate().thenApply(__ -> { - subscriptions.forEach((name, sub) -> { - if (sub.isSubsciptionMigrated()) { - sub.getConsumers().forEach(Consumer::checkAndApplyTopicMigration); - } - }); - return null; - }); - } else { + if (!clusterUrl.isPresent()) { return CompletableFuture.completedFuture(null); } + CompletableFuture migrated = !isMigrated() ? ledger.asyncMigrate() : + CompletableFuture.completedFuture(null); + return migrated.thenApply(__ -> { + subscriptions.forEach((name, sub) -> { + if (sub.isSubsciptionMigrated()) { + sub.getConsumers().forEach(Consumer::checkAndApplyTopicMigration); + } + }); + return null; + }).thenCompose(__ -> checkAndDisconnectReplicators()).thenCompose(__ -> checkAndUnsubscribeSubscriptions()); + } + + private CompletableFuture checkAndUnsubscribeSubscriptions() { + List> futures = new ArrayList<>(); + subscriptions.forEach((s, subscription) -> { + if (subscription.getNumberOfEntriesInBacklog(true) == 0 + && subscription.getConsumers().isEmpty()) { + futures.add(subscription.delete()); + } + }); + + return FutureUtil.waitForAll(futures); + } + + private CompletableFuture checkAndDisconnectReplicators() { + List> futures = new ArrayList<>(); + ConcurrentOpenHashMap replicators = getReplicators(); + replicators.forEach((r, replicator) -> { + if (replicator.getNumberOfEntriesInBacklog() <= 0) { + futures.add(replicator.disconnect()); + } + }); + return FutureUtil.waitForAll(futures); } public boolean isReplicationBacklogExist() { ConcurrentOpenHashMap replicators = getReplicators(); if (replicators != null) { for (Replicator replicator : replicators.values()) { - if (replicator.getNumberOfEntriesInBacklog() != 0) { + if (replicator.getNumberOfEntriesInBacklog() > 0) { return true; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java index c1807a1566132..980a2c01d95b2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java @@ -43,6 +43,7 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -312,6 +313,14 @@ public void testClusterMigration(boolean persistent, SubscriptionType subType) t retryStrategically((test) -> !topic2.getSubscriptions().isEmpty(), 10, 500); assertFalse(topic2.getSubscriptions().isEmpty()); + topic1.checkClusterMigration().get(); + ConcurrentOpenHashMap replicators = topic1.getReplicators(); + replicators.forEach((r, replicator) -> { + assertFalse(replicator.isConnected()); + }); + + assertTrue(topic1.getSubscriptions().isEmpty()); + // not also create a new consumer which should also reconnect to cluster-2 Consumer consumer2 = client1.newConsumer().topic(topicName).subscriptionType(subType) .subscriptionName("s2").subscribe();