-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] pre-create non-partitioned system topics for load balance extension #20370
[fix][broker] pre-create non-partitioned system topics for load balance extension #20370
Conversation
Since this patch fixes a bug, can we add a test to reproduce it which the current PR fixed? |
conflictAndCompactionTest with this config covers this case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to create system topics without partitions explicitly. Currently, we do not support partitioned system topics.
Is this documented anywhere? It makes sense to me, but it seems like an important implementation detail.
...src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
Show resolved
Hide resolved
@@ -213,6 +214,19 @@ public static boolean debug(ServiceConfiguration config, Logger log) { | |||
return config.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled(); | |||
} | |||
|
|||
public static void createSystemTopic(PulsarService pulsar, String topic) throws PulsarServerException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: why is this method public
and static
? It seems like it should be private
since it is used only within an instance of this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used in ServiceUnitStateChannel, too, to create a system topic inside ServiceUnitStateChannel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, I missed that reference. I still worry about misuse of this method because it is public and static--it's probably not a large concern though.
I added a comment in the pip, #16691 (comment) This |
/pulsarbot rerun-failure-checks |
1 similar comment
/pulsarbot rerun-failure-checks |
@@ -213,6 +214,19 @@ public static boolean debug(ServiceConfiguration config, Logger log) { | |||
return config.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled(); | |||
} | |||
|
|||
public static void createSystemTopic(PulsarService pulsar, String topic) throws PulsarServerException { | |||
try { | |||
pulsar.getAdminClient().topics().createNonPartitionedTopic(topic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the consequences of spurious failures here. Does an error lead the broker to need to restart?
Also, how does this work in a system where there is just one broker running in a new cluster? I have not kept up with the load balancing work, but is there any chance of deadlock because creating this topic early requires a leader to already be selected? If this is covered by other documentation, definitely just point me to it. Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The broker needs to restart when an exception is thrown here.
We elect the leader before creating this system topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for confirming.
Codecov Report
@@ Coverage Diff @@
## master #20370 +/- ##
=============================================
+ Coverage 36.89% 72.97% +36.08%
- Complexity 12053 31895 +19842
=============================================
Files 1687 1864 +177
Lines 128826 138397 +9571
Branches 14013 15184 +1171
=============================================
+ Hits 47528 100999 +53471
+ Misses 75040 29397 -45643
- Partials 6258 8001 +1743
Flags with carried forward coverage won't be shown. Click here to find out more.
|
@@ -213,6 +214,19 @@ public static boolean debug(ServiceConfiguration config, Logger log) { | |||
return config.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled(); | |||
} | |||
|
|||
public static void createSystemTopic(PulsarService pulsar, String topic) throws PulsarServerException { | |||
try { | |||
pulsar.getAdminClient().topics().createNonPartitionedTopic(topic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for confirming.
PIP: #16691
Motivation
We need to create system topics without partitions explicitly. Currently, we do not support partitioned system topics.
Modifications
create system topics without partitions explicitly
Verifying this change
(Please pick either of the following options)
This change is already covered by existing tests, such as (please describe tests).
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: