Skip to content

Commit

Permalink
[fix][broker] Fix topic policies cannot be queried with extensible lo…
Browse files Browse the repository at this point in the history
…ad manager (apache#23326)
  • Loading branch information
BewareMyPower authored Sep 20, 2024
1 parent 4b3b273 commit 105192d
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public void cleanOwnerships() {

@Override
public synchronized boolean started() {
return validateChannelState(LeaderElectionServiceStarted, false);
return validateChannelState(Started, true);
}

public synchronized void start() throws PulsarServerException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,9 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
}

private CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesBypassSystemTopic(@Nonnull TopicName topicName) {
if (ExtensibleLoadManagerImpl.isInternalTopic(topicName.toString())) {
return CompletableFuture.completedFuture(Optional.empty());
}
return pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName,
TopicPoliciesService.GetType.DEFAULT);
}
Expand Down Expand Up @@ -3601,6 +3604,9 @@ private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName t
public @Nonnull CompletableFuture<Boolean> isAllowAutoSubscriptionCreationAsync(@Nonnull TopicName tpName) {
requireNonNull(tpName);
// Policies priority: topic level -> namespace level -> broker level
if (ExtensibleLoadManagerImpl.isInternalTopic(tpName.toString())) {
return CompletableFuture.completedFuture(true);
}
return pulsar.getTopicPoliciesService()
.getTopicPoliciesAsync(tpName, TopicPoliciesService.GetType.LOCAL_ONLY)
.thenCompose(optionalTopicPolicies -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4324,6 +4324,9 @@ protected CompletableFuture<Void> initTopicPolicy() {
final var topicPoliciesService = brokerService.pulsar().getTopicPoliciesService();
final var partitionedTopicName = TopicName.getPartitionedTopicName(topic);
if (topicPoliciesService.registerListener(partitionedTopicName, this)) {
if (ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
return CompletableFuture.completedFuture(null);
}
return topicPoliciesService.getTopicPoliciesAsync(partitionedTopicName,
TopicPoliciesService.GetType.DEFAULT
).thenAcceptAsync(optionalPolicies -> optionalPolicies.ifPresent(this::onUpdate),
Expand Down

0 comments on commit 105192d

Please sign in to comment.