From 4924e6d54a8fa4fb1a01f48644c23c625d0407f2 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 20 Jan 2022 20:16:19 +0200 Subject: [PATCH] [Broker] Use shared executors for broker and geo-replication clients (#13839) * [Broker] Use shared executors for broker clients and geo-replication clients * Remove brokerClientNumIOThreads configuration key and default to 1 * Revisit the shared timer creation - don't ever make it a daemon thread --- .../apache/pulsar/broker/PulsarService.java | 33 ++++++++++++++++++- .../broker/namespace/NamespaceService.java | 3 +- .../pulsar/broker/service/BrokerService.java | 3 +- 3 files changed, 34 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index d8a0d50e95048..54592332d8483 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -31,6 +31,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.util.HashedWheelTimer; +import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; import java.io.IOException; import java.lang.reflect.Constructor; @@ -126,6 +127,7 @@ import org.apache.pulsar.client.api.transaction.TransactionBufferClient; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.conf.InternalConfigurationData; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.configuration.VipStatus; @@ -227,6 +229,9 @@ public class PulsarService implements AutoCloseable, ShutdownService { private final Consumer processTerminator; protected final EventLoopGroup ioEventLoopGroup; + private final ExecutorProvider brokerClientSharedInternalExecutorProvider; + private final ExecutorProvider brokerClientSharedExternalExecutorProvider; + private final Timer brokerClientSharedTimer; private MetricsGenerator metricsGenerator; @@ -319,6 +324,18 @@ public PulsarService(ServiceConfiguration config, this.ioEventLoopGroup = EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(), config.isEnableBusyWait(), new DefaultThreadFactory("pulsar-io")); + // the internal executor is not used in the broker client or replication clients since this executor is + // used for consumers and the transaction support in the client. + // since an instance is required, a single threaded shared instance is used for all broker client instances + this.brokerClientSharedInternalExecutorProvider = + new ExecutorProvider(1, "broker-client-shared-internal-executor"); + // the external executor is not used in the broker client or replication clients since this executor is + // used for consumer listeners. + // since an instance is required, a single threaded shared instance is used for all broker client instances + this.brokerClientSharedExternalExecutorProvider = + new ExecutorProvider(1, "broker-client-shared-external-executor"); + this.brokerClientSharedTimer = + new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS); } public MetadataStore createConfigurationMetadataStore() throws MetadataStoreException { @@ -497,6 +514,9 @@ public CompletableFuture closeAsync() { transactionReplayExecutor.shutdown(); } + brokerClientSharedExternalExecutorProvider.shutdownNow(); + brokerClientSharedInternalExecutorProvider.shutdownNow(); + brokerClientSharedTimer.stop(); ioEventLoopGroup.shutdownGracefully(); // add timeout handling for closing executors @@ -1298,6 +1318,17 @@ protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImp return this.offloaderScheduler; } + public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf) + throws PulsarClientException { + return PulsarClientImpl.builder() + .conf(clientConf) + .eventLoopGroup(ioEventLoopGroup) + .timer(brokerClientSharedTimer) + .internalExecutorProvider(brokerClientSharedInternalExecutorProvider) + .externalExecutorProvider(brokerClientSharedExternalExecutorProvider) + .build(); + } + public synchronized PulsarClient getClient() throws PulsarServerException { if (this.client == null) { try { @@ -1331,7 +1362,7 @@ public synchronized PulsarClient getClient() throws PulsarServerException { } conf.setStatsIntervalSeconds(0); - this.client = new PulsarClientImpl(conf, ioEventLoopGroup); + this.client = createClientImpl(conf); } catch (Exception e) { throw new PulsarServerException(e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index a8a808c1012dd..095770fad2a93 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -25,7 +25,6 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import com.google.common.collect.Lists; import com.google.common.hash.Hashing; -import io.netty.channel.EventLoopGroup; import io.prometheus.client.Counter; import java.net.URI; import java.net.URL; @@ -1239,7 +1238,7 @@ public PulsarClientImpl getNamespaceClient(ClusterDataImpl cluster) { // Share all the IO threads across broker and client connections ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData(); - return new PulsarClientImpl(conf, (EventLoopGroup) pulsar.getBrokerService().executor()); + return pulsar.createClientImpl(conf); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 613961d1db01b..3792ca343d52a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -128,7 +128,6 @@ import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.ClientBuilderImpl; -import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.configuration.BindAddress; @@ -1203,7 +1202,7 @@ public PulsarClient getReplicationClient(String cluster, Optional c } // Share all the IO threads across broker and client connections ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData(); - return new PulsarClientImpl(conf, workerGroup); + return pulsar.createClientImpl(conf); } catch (Exception e) { throw new RuntimeException(e); }