Skip to content

Commit 9948f89

Browse files
BewareMyPowerTechnoboy-
authored andcommitted
[fix][broker] Ignore and remove the replicator cursor when the remote cluster is absent (#19972)
1 parent 2dab315 commit 9948f89

File tree

2 files changed

+96
-7
lines changed

2 files changed

+96
-7
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1532,14 +1532,32 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
15321532
return future;
15331533
}
15341534

1535+
private CompletableFuture<Boolean> checkReplicationCluster(String remoteCluster) {
1536+
return brokerService.getPulsar().getPulsarResources().getNamespaceResources()
1537+
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
1538+
.thenApply(optPolicies -> optPolicies.map(policies -> policies.replication_clusters)
1539+
.orElse(Collections.emptySet()).contains(remoteCluster)
1540+
|| topicPolicies.getReplicationClusters().get().contains(remoteCluster));
1541+
}
1542+
15351543
protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, ManagedCursor cursor,
15361544
String localCluster) {
15371545
return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService)
1538-
.thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
1539-
.getClusterAsync(remoteCluster)
1540-
.thenApply(clusterData ->
1541-
brokerService.getReplicationClient(remoteCluster, clusterData)))
1546+
.thenCompose(__ -> checkReplicationCluster(remoteCluster))
1547+
.thenCompose(clusterExists -> {
1548+
if (!clusterExists) {
1549+
log.warn("Remove the replicator because the cluster '{}' does not exist", remoteCluster);
1550+
return removeReplicator(remoteCluster).thenApply(__ -> null);
1551+
}
1552+
return brokerService.pulsar().getPulsarResources().getClusterResources()
1553+
.getClusterAsync(remoteCluster)
1554+
.thenApply(clusterData ->
1555+
brokerService.getReplicationClient(remoteCluster, clusterData));
1556+
})
15421557
.thenAccept(replicationClient -> {
1558+
if (replicationClient == null) {
1559+
return;
1560+
}
15431561
Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
15441562
try {
15451563
return new PersistentReplicator(PersistentTopic.this, cursor, localCluster,
@@ -1563,8 +1581,8 @@ CompletableFuture<Void> removeReplicator(String remoteCluster) {
15631581

15641582
String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
15651583

1566-
replicators.get(remoteCluster).disconnect().thenRun(() -> {
1567-
1584+
Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::disconnect)
1585+
.orElse(CompletableFuture.completedFuture(null)).thenRun(() -> {
15681586
ledger.asyncDeleteCursor(name, new DeleteCursorCallback() {
15691587
@Override
15701588
public void deleteCursorComplete(Object ctx) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@
3737
import java.nio.charset.StandardCharsets;
3838
import java.util.ArrayList;
3939
import java.util.Collection;
40+
import java.util.Collections;
41+
import java.util.HashSet;
4042
import java.util.List;
43+
import java.util.Set;
4144
import java.util.UUID;
4245
import java.util.concurrent.CompletableFuture;
4346
import java.util.concurrent.CountDownLatch;
@@ -46,17 +49,30 @@
4649
import com.google.common.collect.Multimap;
4750
import com.google.common.collect.Sets;
4851
import java.util.concurrent.atomic.AtomicBoolean;
52+
import java.util.function.Supplier;
4953
import lombok.Cleanup;
5054
import org.apache.bookkeeper.client.LedgerHandle;
55+
import org.apache.bookkeeper.mledger.ManagedCursor;
5156
import org.apache.bookkeeper.mledger.ManagedLedger;
5257
import org.apache.pulsar.broker.service.BrokerService;
5358
import org.apache.pulsar.broker.service.BrokerTestBase;
5459
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
5560
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
56-
import org.apache.pulsar.client.api.*;
61+
import org.apache.pulsar.client.api.Consumer;
62+
import org.apache.pulsar.client.api.Message;
63+
import org.apache.pulsar.client.api.MessageId;
64+
import org.apache.pulsar.client.api.MessageListener;
65+
import org.apache.pulsar.client.api.MessageRoutingMode;
66+
import org.apache.pulsar.client.api.Producer;
67+
import org.apache.pulsar.client.api.PulsarClient;
68+
import org.apache.pulsar.client.api.PulsarClientException;
69+
import org.apache.pulsar.client.api.Schema;
70+
import org.apache.pulsar.client.api.SubscriptionType;
5771
import org.apache.pulsar.common.naming.NamespaceBundle;
5872
import org.apache.pulsar.common.naming.TopicName;
73+
import org.apache.pulsar.common.policies.data.ClusterData;
5974
import org.apache.pulsar.common.policies.data.Policies;
75+
import org.apache.pulsar.common.policies.data.TenantInfo;
6076
import org.apache.pulsar.common.policies.data.TopicStats;
6177
import org.awaitility.Awaitility;
6278
import org.junit.Assert;
@@ -402,4 +418,59 @@ public void testDeleteTopicFail() throws Exception {
402418
makeDeletedFailed.set(false);
403419
persistentTopic.delete().get();
404420
}
421+
422+
@DataProvider(name = "topicLevelPolicy")
423+
public static Object[][] topicLevelPolicy() {
424+
return new Object[][] { { true }, { false } };
425+
}
426+
427+
@Test(dataProvider = "topicLevelPolicy")
428+
public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) throws Exception {
429+
final String namespace = "prop/ns-abc";
430+
final String topicName = "persistent://" + namespace
431+
+ "/testCreateTopicWithZombieReplicatorCursor" + topicLevelPolicy;
432+
final String remoteCluster = "remote";
433+
admin.topics().createNonPartitionedTopic(topicName);
434+
admin.topics().createSubscription(topicName, conf.getReplicatorPrefix() + "." + remoteCluster,
435+
MessageId.earliest, true);
436+
437+
admin.clusters().createCluster(remoteCluster, ClusterData.builder()
438+
.serviceUrl("http://localhost:11112")
439+
.brokerServiceUrl("pulsar://localhost:11111")
440+
.build());
441+
TenantInfo tenantInfo = admin.tenants().getTenantInfo("prop");
442+
tenantInfo.getAllowedClusters().add(remoteCluster);
443+
admin.tenants().updateTenant("prop", tenantInfo);
444+
445+
if (topicLevelPolicy) {
446+
admin.topics().setReplicationClusters(topicName, Collections.singletonList(remoteCluster));
447+
} else {
448+
admin.namespaces().setNamespaceReplicationClustersAsync(
449+
namespace, Collections.singleton(remoteCluster)).get();
450+
}
451+
452+
final PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false)
453+
.get(3, TimeUnit.SECONDS).orElse(null);
454+
assertNotNull(topic);
455+
456+
final Supplier<Set<String>> getCursors = () -> {
457+
final Set<String> cursors = new HashSet<>();
458+
final Iterable<ManagedCursor> iterable = topic.getManagedLedger().getCursors();
459+
iterable.forEach(c -> cursors.add(c.getName()));
460+
return cursors;
461+
};
462+
assertEquals(getCursors.get(), Collections.singleton(conf.getReplicatorPrefix() + "." + remoteCluster));
463+
464+
if (topicLevelPolicy) {
465+
admin.topics().setReplicationClusters(topicName, Collections.emptyList());
466+
} else {
467+
admin.namespaces().setNamespaceReplicationClustersAsync(namespace, Collections.emptySet()).get();
468+
}
469+
admin.clusters().deleteCluster(remoteCluster);
470+
// Now the cluster and its related policy has been removed but the replicator cursor still exists
471+
472+
topic.initialize().get(3, TimeUnit.SECONDS);
473+
Awaitility.await().atMost(3, TimeUnit.SECONDS)
474+
.until(() -> !topic.getManagedLedger().getCursors().iterator().hasNext());
475+
}
405476
}

0 commit comments

Comments
 (0)