diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 3ebcd1c20ca87..5893fc4924413 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -267,7 +267,7 @@ public void cleanOwnerships() { @Override public synchronized boolean started() { - return validateChannelState(LeaderElectionServiceStarted, false); + return validateChannelState(Started, true); } public synchronized void start() throws PulsarServerException { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index aee6532716cd8..c7a210bc543cf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1177,6 +1177,9 @@ public CompletableFuture> getTopic(final TopicName topicName, bo } private CompletableFuture> getTopicPoliciesBypassSystemTopic(@Nonnull TopicName topicName) { + if (ExtensibleLoadManagerImpl.isInternalTopic(topicName.toString())) { + return CompletableFuture.completedFuture(Optional.empty()); + } return pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName, TopicPoliciesService.GetType.DEFAULT); } @@ -3601,6 +3604,9 @@ private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName t public @Nonnull CompletableFuture isAllowAutoSubscriptionCreationAsync(@Nonnull TopicName tpName) { requireNonNull(tpName); // Policies priority: topic level -> namespace level -> broker level + if (ExtensibleLoadManagerImpl.isInternalTopic(tpName.toString())) { + return CompletableFuture.completedFuture(true); + } return pulsar.getTopicPoliciesService() .getTopicPoliciesAsync(tpName, TopicPoliciesService.GetType.LOCAL_ONLY) .thenCompose(optionalTopicPolicies -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index e951ffab1e230..9c0bdc120c474 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -4324,6 +4324,9 @@ protected CompletableFuture initTopicPolicy() { final var topicPoliciesService = brokerService.pulsar().getTopicPoliciesService(); final var partitionedTopicName = TopicName.getPartitionedTopicName(topic); if (topicPoliciesService.registerListener(partitionedTopicName, this)) { + if (ExtensibleLoadManagerImpl.isInternalTopic(topic)) { + return CompletableFuture.completedFuture(null); + } return topicPoliciesService.getTopicPoliciesAsync(partitionedTopicName, TopicPoliciesService.GetType.DEFAULT ).thenAcceptAsync(optionalPolicies -> optionalPolicies.ifPresent(this::onUpdate),