-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[improve][broker] PIP-433: Ensure topic creation before starting GEO #24652
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[improve][broker] PIP-433: Ensure topic creation before starting GEO #24652
Conversation
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
9f91e51
to
a613da2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nodece Thanks for driving this feature
} | ||
// Set useFallbackForNonPIP344Brokers to true when mix of PIP-344 and non-PIP-344 brokers are used, it | ||
// can still work. | ||
return client.getLookup().getPartitionedTopicMetadata(baseTopicName, false, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we have pulsarAdmin
object, can we use pulsar-admin.topics.getPartitionedTopicMetadataAsync
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I will refactor this logic.
.getClusterAsync(remoteCluster)) | ||
.thenAccept((clusterData) -> { | ||
PulsarClient replicationClient = brokerService.getReplicationClient(remoteCluster, clusterData); | ||
PulsarAdmin replicationAdmin = brokerService.getClusterPulsarAdmin(remoteCluster, clusterData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
brokerService.getClusterPulsarAdmin
may throw a runtime exception. Should we consider it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will move this logic from PersistentTopic to the replicator.
brokerService.getClusterPulsarAdmin
may throw a runtime exception. Should we consider it?
If an exception is thrown, this future will be complete with an exception.
|
||
Awaitility.await().untilAsserted(() -> { | ||
if (isPartitioned){ | ||
assertThat(admin2.topics().getPartitionedTopicList(ns)).contains(tp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd better check that partitions are also created
} | ||
|
||
@Test | ||
public void testReplicatorWhenPartitionCountsDiffer() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good test
} | ||
assertEquals(receive.getValue(), "msg-p2-1"); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also add another test?
- Both clusters have different topic-auto-creation rules
- The partitioned topic will by created by replicator at remote-side
PIP: #24485
Motivation
In a GEO replication scenario, if the remote cluster does not have the replicated topic and the auto-creation type differs between the local and remote clusters, message replication may fail. To ensure seamless replication, the topic metadata must be properly synchronized across clusters.
This is part of PIP-433.
#24136 is the same as this PR.
Modifications
When both the local and remote partitioned topic metadata indicate
partitions=0
, this means the topic is non-partitioned. In this case, the local cluster sends a non-partitioned topic creation request to the remote cluster.If the local partitioned topic metadata has
partitions>0
, this means the topic is partitioned:partitions=0
, the local cluster sends a partitioned topic creation request to the remote cluster.Documentation
doc
doc-required
doc-not-needed
doc-complete