Skip to content

Commit a613da2

Browse files
committed
[improve][broker] PIP-433: Ensure topic creation before starting GEO
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
1 parent e642e17 commit a613da2

12 files changed

+399
-35
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
3434
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
3535
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
36+
import org.apache.pulsar.client.admin.PulsarAdmin;
3637
import org.apache.pulsar.client.api.MessageRoutingMode;
3738
import org.apache.pulsar.client.api.Producer;
3839
import org.apache.pulsar.client.api.ProducerBuilder;
@@ -57,6 +58,7 @@ public abstract class AbstractReplicator implements Replicator {
5758
protected final String remoteCluster;
5859
protected final PulsarClientImpl replicationClient;
5960
protected final PulsarClientImpl client;
61+
protected final PulsarAdmin replicationAdmin;
6062
protected String replicatorId;
6163
@Getter
6264
protected final Topic localTopic;
@@ -107,7 +109,8 @@ public enum State {
107109
}
108110

109111
public AbstractReplicator(String localCluster, Topic localTopic, String remoteCluster, String remoteTopicName,
110-
String replicatorPrefix, BrokerService brokerService, PulsarClientImpl replicationClient)
112+
String replicatorPrefix, BrokerService brokerService, PulsarClientImpl replicationClient,
113+
PulsarAdmin replicationAdmin)
111114
throws PulsarServerException {
112115
this.brokerService = brokerService;
113116
this.localTopic = localTopic;
@@ -117,6 +120,7 @@ public AbstractReplicator(String localCluster, Topic localTopic, String remoteCl
117120
this.remoteTopicName = remoteTopicName;
118121
this.remoteCluster = StringInterner.intern(remoteCluster);
119122
this.replicationClient = replicationClient;
123+
this.replicationAdmin = replicationAdmin;
120124
this.client = (PulsarClientImpl) brokerService.pulsar().getClient();
121125
this.producer = null;
122126
this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.pulsar.broker.service.AbstractReplicator;
3131
import org.apache.pulsar.broker.service.BrokerService;
3232
import org.apache.pulsar.broker.service.Replicator;
33+
import org.apache.pulsar.client.admin.PulsarAdmin;
3334
import org.apache.pulsar.client.api.MessageId;
3435
import org.apache.pulsar.client.api.Producer;
3536
import org.apache.pulsar.client.api.PulsarClientException;
@@ -51,7 +52,8 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli
5152
private final NonPersistentReplicatorStatsImpl stats = new NonPersistentReplicatorStatsImpl();
5253

5354
public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster,
54-
BrokerService brokerService, PulsarClientImpl replicationClient) throws PulsarServerException {
55+
BrokerService brokerService, PulsarClientImpl replicationClient,
56+
PulsarAdmin replicationAdmin) throws PulsarServerException {
5557
super(localCluster, topic, remoteCluster, topic.getName(), topic.getReplicatorPrefix(), brokerService,
5658
replicationClient);
5759
// NonPersistentReplicator does not support limitation so far, so reset pending queue size to the default value.

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@
7171
import org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
7272
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
7373
import org.apache.pulsar.broker.stats.NamespaceStats;
74+
import org.apache.pulsar.client.admin.PulsarAdmin;
7475
import org.apache.pulsar.client.api.MessageId;
76+
import org.apache.pulsar.client.api.PulsarClient;
7577
import org.apache.pulsar.client.api.transaction.TxnID;
7678
import org.apache.pulsar.client.impl.PulsarClientImpl;
7779
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
@@ -628,14 +630,15 @@ protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, No
628630
String localCluster) {
629631
return AbstractReplicator.validatePartitionedTopicAsync(nonPersistentTopic.getName(), brokerService)
630632
.thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
631-
.getClusterAsync(remoteCluster)
632-
.thenApply(clusterData ->
633-
brokerService.getReplicationClient(remoteCluster, clusterData)))
634-
.thenAccept(replicationClient -> {
633+
.getClusterAsync(remoteCluster))
634+
.thenAccept((clusterData) -> {
635+
PulsarClient replicationClient = brokerService.getReplicationClient(remoteCluster, clusterData);
636+
PulsarAdmin replicationAdmin = brokerService.getClusterPulsarAdmin(remoteCluster, clusterData);
635637
replicators.computeIfAbsent(remoteCluster, r -> {
636638
try {
637639
return new NonPersistentReplicator(NonPersistentTopic.this, localCluster,
638-
remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
640+
remoteCluster, brokerService, (PulsarClientImpl) replicationClient,
641+
replicationAdmin);
639642
} catch (PulsarServerException e) {
640643
log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
641644
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java

Lines changed: 118 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,21 @@
2222
import io.netty.buffer.ByteBuf;
2323
import java.util.List;
2424
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.CompletionException;
2526
import lombok.extern.slf4j.Slf4j;
2627
import org.apache.bookkeeper.mledger.Entry;
2728
import org.apache.bookkeeper.mledger.ManagedCursor;
2829
import org.apache.pulsar.broker.PulsarServerException;
2930
import org.apache.pulsar.broker.service.BrokerService;
31+
import org.apache.pulsar.client.admin.PulsarAdmin;
32+
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
33+
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
3034
import org.apache.pulsar.client.api.PulsarClientException;
3135
import org.apache.pulsar.client.api.transaction.TxnID;
3236
import org.apache.pulsar.client.impl.MessageImpl;
3337
import org.apache.pulsar.client.impl.PulsarClientImpl;
38+
import org.apache.pulsar.common.naming.TopicName;
39+
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
3440
import org.apache.pulsar.common.protocol.Markers;
3541
import org.apache.pulsar.common.schema.SchemaInfo;
3642
import org.apache.pulsar.common.util.FutureUtil;
@@ -40,9 +46,10 @@ public class GeoPersistentReplicator extends PersistentReplicator {
4046

4147
public GeoPersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster,
4248
String remoteCluster, BrokerService brokerService,
43-
PulsarClientImpl replicationClient)
49+
PulsarClientImpl replicationClient, PulsarAdmin replicationAdmin)
4450
throws PulsarServerException {
45-
super(localCluster, topic, cursor, remoteCluster, topic.getName(), brokerService, replicationClient);
51+
super(localCluster, topic, cursor, remoteCluster, topic.getName(), brokerService, replicationClient,
52+
replicationAdmin);
4653
}
4754

4855
/**
@@ -56,7 +63,115 @@ protected String getProducerName() {
5663
@Override
5764
protected CompletableFuture<Void> prepareCreateProducer() {
5865
if (brokerService.getPulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) {
59-
return CompletableFuture.completedFuture(null);
66+
TopicName completeTopicName = TopicName.get(localTopicName);
67+
TopicName baseTopicName;
68+
if (completeTopicName.isPartitioned()) {
69+
baseTopicName = TopicName.get(completeTopicName.getPartitionedTopicName());
70+
} else {
71+
baseTopicName = completeTopicName;
72+
}
73+
// Set useFallbackForNonPIP344Brokers to true when mix of PIP-344 and non-PIP-344 brokers are used, it
74+
// can still work.
75+
return client.getLookup().getPartitionedTopicMetadata(baseTopicName, false, true)
76+
.thenCompose((localMetadata) -> replicationAdmin.topics()
77+
// https://github.com/apache/pulsar/pull/4963
78+
// Use the admin API instead of the client to fetch partitioned metadata
79+
// to prevent automatic topic creation on the remote cluster.
80+
// PIP-344 introduced an option to disable auto-creation when fetching partitioned
81+
// topic metadata via the client, but this requires Pulsar 3.0.x.
82+
// This change is a workaround to support Pulsar 2.4.2.
83+
.getPartitionedTopicMetadataAsync(baseTopicName.toString())
84+
.exceptionally(ex -> {
85+
Throwable throwable = FutureUtil.unwrapCompletionException(ex);
86+
if (throwable instanceof NotFoundException) {
87+
// Topic does not exist on the remote cluster.
88+
return new PartitionedTopicMetadata(0);
89+
}
90+
throw new CompletionException("Failed to get partitioned topic metadata", throwable);
91+
}).thenCompose(remoteMetadata -> {
92+
if (log.isDebugEnabled()) {
93+
log.debug("[{}] Local metadata partitions: {} Remote metadata partitions: {}",
94+
replicatorId, localMetadata.partitions, remoteMetadata.partitions);
95+
}
96+
97+
// Non-partitioned topic
98+
if (localMetadata.partitions == 0) {
99+
if (localMetadata.partitions == remoteMetadata.partitions) {
100+
return replicationAdmin.topics().createNonPartitionedTopicAsync(localTopicName)
101+
.exceptionally(ex -> {
102+
Throwable throwable = FutureUtil.unwrapCompletionException(ex);
103+
if (throwable instanceof ConflictException) {
104+
// Topic already exists on the remote cluster.
105+
return null;
106+
} else {
107+
throw new CompletionException(
108+
"Failed to create non-partitioned topic", throwable);
109+
}
110+
});
111+
} else {
112+
return FutureUtil.failedFuture(new PulsarClientException.NotAllowedException(
113+
"Topic type is not matched between local and remote cluster: local "
114+
+ "partitions: " + localMetadata.partitions
115+
+ ", remote partitions: " + remoteMetadata.partitions));
116+
}
117+
} else {
118+
if (remoteMetadata.partitions == 0) {
119+
if (log.isDebugEnabled()) {
120+
log.debug("[{}] Creating partitioned topic {} with {} partitions",
121+
replicatorId, baseTopicName, localMetadata.partitions);
122+
}
123+
// We maybe need to create a partitioned topic on remote cluster.
124+
return replicationAdmin.topics()
125+
.createPartitionedTopicAsync(baseTopicName.toString(),
126+
localMetadata.partitions)
127+
.exceptionally(ex -> {
128+
Throwable throwable = FutureUtil.unwrapCompletionException(ex);
129+
if (throwable instanceof ConflictException) {
130+
// Topic already exists on the remote cluster.
131+
// This can happen if the topic was created, or the topic is
132+
// non-partitioned.
133+
return null;
134+
} else {
135+
throw new CompletionException(
136+
"Failed to create partitioned topic", throwable);
137+
}
138+
})
139+
.thenCompose((__) -> replicationAdmin.topics()
140+
.getPartitionedTopicMetadataAsync(baseTopicName.toString()))
141+
.thenCompose(metadata -> {
142+
// Double check if the partitioned topic is created
143+
// successfully.
144+
// When partitions is equals to 0, it means this topic is
145+
// non-partitioned, we should throw an exception.
146+
if (completeTopicName.getPartitionIndex() >= metadata.partitions) {
147+
return FutureUtil.failedFuture(
148+
new PulsarClientException.NotAllowedException(
149+
"Topic type is not matched between "
150+
+ "local and "
151+
+ "remote cluster: local "
152+
+ "partitions: "
153+
+ localMetadata.partitions
154+
+ ", remote partitions: "
155+
+ remoteMetadata.partitions));
156+
}
157+
return CompletableFuture.completedFuture(null);
158+
});
159+
} else {
160+
if (localMetadata.partitions != remoteMetadata.partitions) {
161+
return FutureUtil.failedFuture(
162+
new PulsarClientException.NotAllowedException(
163+
"The number of topic partitions is inconsistent between "
164+
+ "local and"
165+
+ " remote "
166+
+ "clusters: local partitions: "
167+
+ localMetadata.partitions
168+
+ ", remote partitions: "
169+
+ remoteMetadata.partitions));
170+
}
171+
}
172+
}
173+
return CompletableFuture.completedFuture(null);
174+
}));
60175
} else {
61176
CompletableFuture<Void> topicCheckFuture = new CompletableFuture<>();
62177
replicationClient.getPartitionedTopicMetadata(localTopic.getName(), false, false)

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.pulsar.broker.service.BrokerServiceException;
5959
import org.apache.pulsar.broker.service.MessageExpirer;
6060
import org.apache.pulsar.broker.service.Replicator;
61+
import org.apache.pulsar.client.admin.PulsarAdmin;
6162
import org.apache.pulsar.client.api.MessageId;
6263
import org.apache.pulsar.client.api.Producer;
6364
import org.apache.pulsar.client.api.PulsarClientException;
@@ -117,11 +118,12 @@ protected enum ReasonOfWaitForCursorRewinding {
117118
protected final LinkedList<InFlightTask> inFlightTasks = new LinkedList<>();
118119

119120
public PersistentReplicator(String localCluster, PersistentTopic localTopic, ManagedCursor cursor,
120-
String remoteCluster, String remoteTopic,
121-
BrokerService brokerService, PulsarClientImpl replicationClient)
121+
String remoteCluster, String remoteTopic,
122+
BrokerService brokerService, PulsarClientImpl replicationClient,
123+
PulsarAdmin replicationAdmin)
122124
throws PulsarServerException {
123125
super(localCluster, localTopic, remoteCluster, remoteTopic, localTopic.getReplicatorPrefix(),
124-
brokerService, replicationClient);
126+
brokerService, replicationClient, replicationAdmin);
125127
this.topic = localTopic;
126128
this.cursor = Objects.requireNonNull(cursor);
127129
this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopic,

0 commit comments

Comments
 (0)