Skip to content

Commit

Permalink
[Broker] Use shared executors for broker and geo-replication clients (#…
Browse files Browse the repository at this point in the history
…13839)

* [Broker] Use shared executors for broker clients and geo-replication clients

* Remove brokerClientNumIOThreads configuration key and default to 1

* Revisit the shared timer creation

- don't ever make it a daemon thread
  • Loading branch information
lhotari authored Jan 20, 2022
1 parent 4fc7cc0 commit 4924e6d
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.lang.reflect.Constructor;
Expand Down Expand Up @@ -126,6 +127,7 @@
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.configuration.VipStatus;
Expand Down Expand Up @@ -227,6 +229,9 @@ public class PulsarService implements AutoCloseable, ShutdownService {

private final Consumer<Integer> processTerminator;
protected final EventLoopGroup ioEventLoopGroup;
private final ExecutorProvider brokerClientSharedInternalExecutorProvider;
private final ExecutorProvider brokerClientSharedExternalExecutorProvider;
private final Timer brokerClientSharedTimer;

private MetricsGenerator metricsGenerator;

Expand Down Expand Up @@ -319,6 +324,18 @@ public PulsarService(ServiceConfiguration config,

this.ioEventLoopGroup = EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(), config.isEnableBusyWait(),
new DefaultThreadFactory("pulsar-io"));
// the internal executor is not used in the broker client or replication clients since this executor is
// used for consumers and the transaction support in the client.
// since an instance is required, a single threaded shared instance is used for all broker client instances
this.brokerClientSharedInternalExecutorProvider =
new ExecutorProvider(1, "broker-client-shared-internal-executor");
// the external executor is not used in the broker client or replication clients since this executor is
// used for consumer listeners.
// since an instance is required, a single threaded shared instance is used for all broker client instances
this.brokerClientSharedExternalExecutorProvider =
new ExecutorProvider(1, "broker-client-shared-external-executor");
this.brokerClientSharedTimer =
new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS);
}

public MetadataStore createConfigurationMetadataStore() throws MetadataStoreException {
Expand Down Expand Up @@ -497,6 +514,9 @@ public CompletableFuture<Void> closeAsync() {
transactionReplayExecutor.shutdown();
}

brokerClientSharedExternalExecutorProvider.shutdownNow();
brokerClientSharedInternalExecutorProvider.shutdownNow();
brokerClientSharedTimer.stop();
ioEventLoopGroup.shutdownGracefully();

// add timeout handling for closing executors
Expand Down Expand Up @@ -1298,6 +1318,17 @@ protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImp
return this.offloaderScheduler;
}

public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf)
throws PulsarClientException {
return PulsarClientImpl.builder()
.conf(clientConf)
.eventLoopGroup(ioEventLoopGroup)
.timer(brokerClientSharedTimer)
.internalExecutorProvider(brokerClientSharedInternalExecutorProvider)
.externalExecutorProvider(brokerClientSharedExternalExecutorProvider)
.build();
}

public synchronized PulsarClient getClient() throws PulsarServerException {
if (this.client == null) {
try {
Expand Down Expand Up @@ -1331,7 +1362,7 @@ public synchronized PulsarClient getClient() throws PulsarServerException {
}

conf.setStatsIntervalSeconds(0);
this.client = new PulsarClientImpl(conf, ioEventLoopGroup);
this.client = createClientImpl(conf);
} catch (Exception e) {
throw new PulsarServerException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import io.netty.channel.EventLoopGroup;
import io.prometheus.client.Counter;
import java.net.URI;
import java.net.URL;
Expand Down Expand Up @@ -1239,7 +1238,7 @@ public PulsarClientImpl getNamespaceClient(ClusterDataImpl cluster) {

// Share all the IO threads across broker and client connections
ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
return new PulsarClientImpl(conf, (EventLoopGroup) pulsar.getBrokerService().executor());
return pulsar.createClientImpl(conf);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.BindAddress;
Expand Down Expand Up @@ -1203,7 +1202,7 @@ public PulsarClient getReplicationClient(String cluster, Optional<ClusterData> c
}
// Share all the IO threads across broker and client connections
ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
return new PulsarClientImpl(conf, workerGroup);
return pulsar.createClientImpl(conf);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down

0 comments on commit 4924e6d

Please sign in to comment.