Skip to content

Commit

Permalink
FABJ-378 Make excutor service configurable
Browse files Browse the repository at this point in the history
Change-Id: Iff50ab585cf27b94b0e3e212e09bebb484471b16
Signed-off-by: rickr <cr22rc@gmail.com>
  • Loading branch information
cr22rc committed Oct 22, 2018
1 parent ce7fd3b commit 5787023
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 12 deletions.
11 changes: 11 additions & 0 deletions config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@
#org.hyperledger.fabric.sdk.connections.ssl.sslProvider=openSSL
## Default negotiation type for grpc ssl connections. (TLS, plainText)
#org.hyperledger.fabric.sdk.connections.ssl.negotiationType=TLS
## the number of threads to keep in the pool, even if they are idle, unless {@code allowCoreThreadTimeOut} is set
#org.hyperledger.fabric.sdk.client.thread_executor_corepoolsize=0
## maximumPoolSize the maximum number of threads to allow in the pool defautl is Max integer.
#org.hyperledger.fabric.sdk.client.thread_executor_maximumpoolsize=2147483647
## keepAliveTime when the number of threads is greater than
## the core, this is the maximum time that excess idle threads
## will wait for new tasks before terminating.
#org.hyperledger.fabric.sdk.client.thread_executor_keepalivetime=60
## the time unit for the {@code keepAliveTime} argument (SECONDS,MILLISECONDS,NANOSECDONS) see Java's TimeUnit
#org.hyperledger.fabric.sdk.client.thread_executor_keepalivetimeunit=SECONDS

# System wide defaults for CryptoPrimitives objects. You can customize further by using the
# CryptoPrimitives.setProperties() method.
Expand All @@ -55,3 +65,4 @@
# The algorithm used to generate a signature. Valid values are listed in the JCA Standard Algorithm Name Documentation
# e.g. http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#Signature
# org.hyperledger.fabric.sdk.crypto.default_signature_algorithm = SHA256withECDSA

38 changes: 27 additions & 11 deletions src/main/java/org/hyperledger/fabric/sdk/HFClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -39,15 +43,18 @@
import org.hyperledger.fabric.sdk.exception.NetworkConfigurationException;
import org.hyperledger.fabric.sdk.exception.ProposalException;
import org.hyperledger.fabric.sdk.exception.TransactionException;
import org.hyperledger.fabric.sdk.helper.Config;
import org.hyperledger.fabric.sdk.helper.Utils;
import org.hyperledger.fabric.sdk.security.CryptoSuite;

import static java.lang.String.format;
import static org.hyperledger.fabric.sdk.User.userContextCheck;

public class HFClient {
private static final Config config = Config.getConfig();

private CryptoSuite cryptoSuite;
protected final ExecutorService executorService;

static {

Expand All @@ -59,11 +66,6 @@ public class HFClient {
}
}

private final ExecutorService executorService = Executors.newCachedThreadPool(r -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
return t;
});

ExecutorService getExecutorService() {
return executorService;
Expand All @@ -79,8 +81,24 @@ public User getUserContext() {

private User userContext;

protected final ThreadFactory threadFactory = Executors.defaultThreadFactory();

private static final int CLIENT_THREAD_EXECUTOR_COREPOOLSIZE = config.getClientThreadExecutorCorePoolSize();
private static final int CLIENT_THREAD_EXECUTOR_MAXIMUMPOOLSIZE = config.getClientThreadExecutorMaxiumPoolSize();
private static final long CLIENT_THREAD_EXECUTOR_KEEPALIVETIME = config.getClientThreadExecutorKeepAliveTime();
private static final TimeUnit CLIENT_THREAD_EXECUTOR_KEEPALIVETIMEUNIT = config.getClientThreadExecutorKeepAliveTimeUnit();

private HFClient() {

executorService = new ThreadPoolExecutor(CLIENT_THREAD_EXECUTOR_COREPOOLSIZE, CLIENT_THREAD_EXECUTOR_MAXIMUMPOOLSIZE,
CLIENT_THREAD_EXECUTOR_KEEPALIVETIME, CLIENT_THREAD_EXECUTOR_KEEPALIVETIMEUNIT,
new SynchronousQueue<Runnable>(),
r -> {
Thread t = threadFactory.newThread(r);
t.setDaemon(true);
return t;
});

}

public CryptoSuite getCryptoSuite() {
Expand Down Expand Up @@ -120,7 +138,7 @@ public static HFClient createNewInstance() {
* Configures a channel based on information loaded from a Network Config file.
* Note that it is up to the caller to initialize the returned channel.
*
* @param channelName The name of the channel to be configured
* @param channelName The name of the channel to be configured
* @param networkConfig The network configuration to use to configure the channel
* @return The configured channel, or null if the channel is not defined in the configuration
* @throws InvalidArgumentException
Expand All @@ -144,7 +162,6 @@ public Channel loadChannelFromConfig(String channelName, NetworkConfig networkCo
return networkConfig.loadChannel(this, channelName);
}


/**
* newChannel - already configured channel.
*
Expand Down Expand Up @@ -188,7 +205,7 @@ public Channel newChannel(String name) throws InvalidArgumentException {
*/

public Channel newChannel(String name, Orderer orderer, ChannelConfiguration channelConfiguration,
byte[]... channelConfigurationSignatures) throws TransactionException, InvalidArgumentException {
byte[]... channelConfigurationSignatures) throws TransactionException, InvalidArgumentException {

clientCheck();
if (Utils.isNullOrEmpty(name)) {
Expand Down Expand Up @@ -620,7 +637,7 @@ public byte[] getChannelConfigurationSignature(ChannelConfiguration channelConfi
*/

public byte[] getUpdateChannelConfigurationSignature(UpdateChannelConfiguration updateChannelConfiguration,
User signer) throws InvalidArgumentException {
User signer) throws InvalidArgumentException {

clientCheck();

Expand All @@ -640,7 +657,7 @@ public byte[] getUpdateChannelConfigurationSignature(UpdateChannelConfiguration
*/

public Collection<ProposalResponse> sendInstallProposal(InstallProposalRequest installProposalRequest,
Collection<Peer> peers) throws ProposalException, InvalidArgumentException {
Collection<Peer> peers) throws ProposalException, InvalidArgumentException {

clientCheck();

Expand All @@ -651,7 +668,6 @@ public Collection<ProposalResponse> sendInstallProposal(InstallProposalRequest i

}


private void clientCheck() throws InvalidArgumentException {

if (null == cryptoSuite) {
Expand Down
63 changes: 62 additions & 1 deletion src/main/java/org/hyperledger/fabric/sdk/helper/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -83,6 +84,15 @@ public class Config {
public static final String CONN_SSL_PROVIDER = "org.hyperledger.fabric.sdk.connections.ssl.sslProvider";
public static final String CONN_SSL_NEGTYPE = "org.hyperledger.fabric.sdk.connections.ssl.negotiationType";

/**
* Default HFClient thread executor settings.
*/

public static final String CLIENT_THREAD_EXECUTOR_COREPOOLSIZE = "org.hyperledger.fabric.sdk.client.thread_executor_corepoolsize";
public static final String CLIENT_THREAD_EXECUTOR_MAXIMUMPOOLSIZE = "org.hyperledger.fabric.sdk.client.thread_executor_maximumpoolsize";
public static final String CLIENT_THREAD_EXECUTOR_KEEPALIVETIME = "org.hyperledger.fabric.sdk.client.thread_executor_keepalivetime";
public static final String CLIENT_THREAD_EXECUTOR_KEEPALIVETIMEUNIT = "org.hyperledger.fabric.sdk.client.thread_executor_keepalivetimeunit";

/**
* Miscellaneous settings
**/
Expand Down Expand Up @@ -150,6 +160,15 @@ private Config() {
defaultProperty(CONN_SSL_PROVIDER, "openSSL");
defaultProperty(CONN_SSL_NEGTYPE, "TLS");

/**
* Default HFClient thread executor settings.
*/

defaultProperty(CLIENT_THREAD_EXECUTOR_COREPOOLSIZE, "0");
defaultProperty(CLIENT_THREAD_EXECUTOR_MAXIMUMPOOLSIZE, "" + Integer.MAX_VALUE);
defaultProperty(CLIENT_THREAD_EXECUTOR_KEEPALIVETIME, "" + "60");
defaultProperty(CLIENT_THREAD_EXECUTOR_KEEPALIVETIMEUNIT, "SECONDS");

/**
* Logging settings
**/
Expand All @@ -167,7 +186,6 @@ private Config() {
defaultProperty(SERVICE_DISCOVER_FREQ_SECONDS, "120");
defaultProperty(SERVICE_DISCOVER_WAIT_TIME, "5000");


final String inLogLevel = sdkProperties.getProperty(LOGGERLEVEL);

if (null != inLogLevel) {
Expand Down Expand Up @@ -528,4 +546,47 @@ public DiagnosticFileDumper getDiagnosticFileDumper() {
public long getTransactionListenerCleanUpTimeout() {
return Long.parseLong(getProperty(TRANSACTION_CLEANUP_UP_TIMEOUT_WAIT_TIME));
}

/**
* The number of threads to keep in the pool, even if they are idle, unless {@code allowCoreThreadTimeOut} is set
*
* @return The number of threads to keep in the pool, even if they are idle, unless {@code allowCoreThreadTimeOut} is set
*/

public int getClientThreadExecutorCorePoolSize() {
return Integer.parseInt(getProperty(CLIENT_THREAD_EXECUTOR_COREPOOLSIZE));
}

/**
* maximumPoolSize the maximum number of threads to allow in the pool
*
* @return maximumPoolSize the maximum number of threads to allow in the pool
*/
public int getClientThreadExecutorMaxiumPoolSize() {
return Integer.parseInt(getProperty(CLIENT_THREAD_EXECUTOR_MAXIMUMPOOLSIZE));
}

/**
* keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
*
* @return The keep alive time.
*/

public long getClientThreadExecutorKeepAliveTime() {
return Long.parseLong(getProperty(CLIENT_THREAD_EXECUTOR_KEEPALIVETIME));
}

/**
* the time unit for the argument
*
* @return
*/

public TimeUnit getClientThreadExecutorKeepAliveTimeUnit() {

return TimeUnit.valueOf(getProperty(CLIENT_THREAD_EXECUTOR_KEEPALIVETIMEUNIT));
}

}

0 comments on commit 5787023

Please sign in to comment.