Skip to content

Commit

Permalink
[improve][broker] PIP-275:Introduce topicOrderedExecutorThreadNum to …
Browse files Browse the repository at this point in the history
…deprecate numWorkerThreadsForNonPersistentTopic (#20504)

PIP: #20507

<!-- Details of when a PIP is required and how the PIP process work, please see: https://github.com/apache/pulsar/blob/master/pip/README.md -->

### Motivation

See #20507

### Modifications

Introduce `topicOrderedExecutorThreadNum ` to deprecate `numWorkerThreadsForNonPersistentTopic`
  • Loading branch information
AnonHxy authored Jun 25, 2023
1 parent 46b6dcd commit 2b01f83
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 10 deletions.
8 changes: 6 additions & 2 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,8 @@ maxConcurrentTopicLoadRequest=5000
# Max concurrent non-persistent message can be processed per connection
maxConcurrentNonPersistentMessagePerConnection=1000

# Number of worker threads to serve non-persistent topic
numWorkerThreadsForNonPersistentTopic=
# Number of worker threads to serve topic ordered executor
topicOrderedExecutorThreadNum=

# Enable broker to load persistent topics
enablePersistentTopics=true
Expand Down Expand Up @@ -1816,3 +1816,7 @@ persistentUnackedRangesWithMultipleEntriesEnabled=false

# Deprecated - Use managedLedgerCacheEvictionIntervalMs instead
managedLedgerCacheEvictionFrequency=0

# Number of worker threads to serve non-persistent topic
# Deprecated - use topicOrderedExecutorThreadNum instead.
numWorkerThreadsForNonPersistentTopic=
8 changes: 6 additions & 2 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ maxConcurrentTopicLoadRequest=5000
# Max concurrent non-persistent message can be processed per connection
maxConcurrentNonPersistentMessagePerConnection=1000

# Number of worker threads to serve non-persistent topic
numWorkerThreadsForNonPersistentTopic=8
# Number of worker threads to serve topic ordered executor
topicOrderedExecutorThreadNum=8

# Enable broker to load persistent topics
enablePersistentTopics=true
Expand Down Expand Up @@ -1201,6 +1201,10 @@ functionsWorkerEnablePackageManagement=false

# These settings are left here for compatibility

# Number of worker threads to serve non-persistent topic
# Deprecated - use topicOrderedExecutorThreadNum instead.
numWorkerThreadsForNonPersistentTopic=8

# Zookeeper session timeout in milliseconds
# Deprecated: use metadataStoreSessionTimeoutMillis
zooKeeperSessionTimeoutMillis=-1
Expand Down
8 changes: 6 additions & 2 deletions deployment/terraform-ansible/templates/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,8 @@ maxConcurrentTopicLoadRequest=5000
# Max concurrent non-persistent message can be processed per connection
maxConcurrentNonPersistentMessagePerConnection=1000

# Number of worker threads to serve non-persistent topic
numWorkerThreadsForNonPersistentTopic=8
# Number of worker threads to serve topic ordered executor
topicOrderedExecutorThreadNum=8

# Enable broker to load persistent topics
enablePersistentTopics=true
Expand Down Expand Up @@ -1127,6 +1127,10 @@ fileSystemURI=

### --- Deprecated config variables --- ###

# Number of worker threads to serve non-persistent topic
# Deprecated - use topicOrderedExecutorThreadNum instead.
numWorkerThreadsForNonPersistentTopic=8

# Deprecated. Use configurationStoreServers
globalZookeeperServers={{ zookeeper_servers }}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1190,10 +1190,18 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
category = CATEGORY_SERVER,
doc = "Max concurrent non-persistent message can be processed per connection")
private int maxConcurrentNonPersistentMessagePerConnection = 1000;

@Deprecated
@FieldContext(
category = CATEGORY_SERVER,
doc = "Number of worker threads to serve non-persistent topic")
private int numWorkerThreadsForNonPersistentTopic = Runtime.getRuntime().availableProcessors();
deprecated = true,
doc = "Number of worker threads to serve non-persistent topic.\n"
+ "@deprecated - use topicOrderedExecutorThreadNum instead.")
private int numWorkerThreadsForNonPersistentTopic = -1;
@FieldContext(
category = CATEGORY_SERVER,
doc = "Number of worker threads to serve topic ordered executor")
private int topicOrderedExecutorThreadNum = Runtime.getRuntime().availableProcessors();

@FieldContext(
category = CATEGORY_SERVER,
Expand Down Expand Up @@ -3474,4 +3482,9 @@ public long getManagedLedgerCacheEvictionIntervalMs() {
MIN_ML_CACHE_EVICTION_FREQUENCY))
: Math.min(MAX_ML_CACHE_EVICTION_INTERVAL_MS, managedLedgerCacheEvictionIntervalMs);
}

public int getTopicOrderedExecutorThreadNum() {
return numWorkerThreadsForNonPersistentTopic > 0
? numWorkerThreadsForNonPersistentTopic : topicOrderedExecutorThreadNum;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
PersistentOfflineTopicStats>newBuilder().build();

this.topicOrderedExecutor = OrderedExecutor.newBuilder()
.numThreads(pulsar.getConfiguration().getNumWorkerThreadsForNonPersistentTopic())
.numThreads(pulsar.getConfiguration().getTopicOrderedExecutorThreadNum())
.name("broker-topic-workers").build();
final DefaultThreadFactory acceptorThreadFactory =
new ExecutorProvider.ExtendedThreadFactory("pulsar-acceptor");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private ServiceConfiguration getConf() {
conf.setWebServicePort(Optional.of(0));
conf.setNumExecutorThreadPoolSize(1);
conf.setNumCacheExecutorThreadPoolSize(1);
conf.setNumWorkerThreadsForNonPersistentTopic(1);
conf.setTopicOrderedExecutorThreadNum(1);
conf.setNumIOThreads(1);
conf.setNumOrderedExecutorThreads(1);
conf.setBookkeeperClientNumWorkerThreads(1);
Expand Down

0 comments on commit 2b01f83

Please sign in to comment.