Skip to content

Commit

Permalink
[fix][broker]Delete subscription and disconnect replicators after top…
Browse files Browse the repository at this point in the history
…ic migration (#21029)

Co-authored-by: Vineeth Polamreddy <vineeth.polamreddy@verizonmedia.com>
  • Loading branch information
vineeth1995 and Vineeth Polamreddy authored Aug 22, 2023
1 parent d3a6df3 commit c1b0454
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -959,10 +959,31 @@ public CompletableFuture<Void> checkClusterMigration() {
consumer.topicMigrated(url);
});
});
return disconnectReplicators().thenCompose(__ -> checkAndUnsubscribeSubscriptions());
}
return CompletableFuture.completedFuture(null);
}

private CompletableFuture<Void> checkAndUnsubscribeSubscriptions() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
subscriptions.forEach((s, subscription) -> {
if (subscription.getConsumers().isEmpty()) {
futures.add(subscription.delete());
}
});

return FutureUtil.waitForAll(futures);
}

private CompletableFuture<Void> disconnectReplicators() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
ConcurrentOpenHashMap<String, NonPersistentReplicator> replicators = getReplicators();
replicators.forEach((r, replicator) -> {
futures.add(replicator.disconnect());
});
return FutureUtil.waitForAll(futures);
}

@Override
public void checkGC() {
if (!isDeleteWhileInactive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCallback {

// Managed ledger associated with the topic
Expand Down Expand Up @@ -2575,25 +2576,49 @@ private boolean hasBacklogs() {
@Override
public CompletableFuture<Void> checkClusterMigration() {
Optional<ClusterUrl> clusterUrl = getMigratedClusterUrl();
if (!isMigrated() && clusterUrl.isPresent()) {
return ledger.asyncMigrate().thenApply(__ -> {
subscriptions.forEach((name, sub) -> {
if (sub.isSubsciptionMigrated()) {
sub.getConsumers().forEach(Consumer::checkAndApplyTopicMigration);
}
});
return null;
});
} else {
if (!clusterUrl.isPresent()) {
return CompletableFuture.completedFuture(null);
}
CompletableFuture<?> migrated = !isMigrated() ? ledger.asyncMigrate() :
CompletableFuture.completedFuture(null);
return migrated.thenApply(__ -> {
subscriptions.forEach((name, sub) -> {
if (sub.isSubsciptionMigrated()) {
sub.getConsumers().forEach(Consumer::checkAndApplyTopicMigration);
}
});
return null;
}).thenCompose(__ -> checkAndDisconnectReplicators()).thenCompose(__ -> checkAndUnsubscribeSubscriptions());
}

private CompletableFuture<Void> checkAndUnsubscribeSubscriptions() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
subscriptions.forEach((s, subscription) -> {
if (subscription.getNumberOfEntriesInBacklog(true) == 0
&& subscription.getConsumers().isEmpty()) {
futures.add(subscription.delete());
}
});

return FutureUtil.waitForAll(futures);
}

private CompletableFuture<Void> checkAndDisconnectReplicators() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
ConcurrentOpenHashMap<String, Replicator> replicators = getReplicators();
replicators.forEach((r, replicator) -> {
if (replicator.getNumberOfEntriesInBacklog() <= 0) {
futures.add(replicator.disconnect());
}
});
return FutureUtil.waitForAll(futures);
}

public boolean isReplicationBacklogExist() {
ConcurrentOpenHashMap<String, Replicator> replicators = getReplicators();
if (replicators != null) {
for (Replicator replicator : replicators.values()) {
if (replicator.getNumberOfEntriesInBacklog() != 0) {
if (replicator.getNumberOfEntriesInBacklog() > 0) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -312,6 +313,14 @@ public void testClusterMigration(boolean persistent, SubscriptionType subType) t
retryStrategically((test) -> !topic2.getSubscriptions().isEmpty(), 10, 500);
assertFalse(topic2.getSubscriptions().isEmpty());

topic1.checkClusterMigration().get();
ConcurrentOpenHashMap<String, ? extends Replicator> replicators = topic1.getReplicators();
replicators.forEach((r, replicator) -> {
assertFalse(replicator.isConnected());
});

assertTrue(topic1.getSubscriptions().isEmpty());

// not also create a new consumer which should also reconnect to cluster-2
Consumer<byte[]> consumer2 = client1.newConsumer().topic(topicName).subscriptionType(subType)
.subscriptionName("s2").subscribe();
Expand Down

0 comments on commit c1b0454

Please sign in to comment.