Skip to content

Commit

Permalink
[ISSUE #9013] enhance grpc client (#9017)
Browse files Browse the repository at this point in the history
* fix issue #9013

* reformat code style

* reformat code style

* reformat code style

* reformat code style

* reformat code style

* reformat code style

* reformat code style

* change config name

* do some enhance

* add queue size to configProperties

* reformat code style

* Change configuration load order
  • Loading branch information
karsonto authored Aug 29, 2022
1 parent d123703 commit 9819820
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -60,6 +62,12 @@ public abstract class GrpcClient extends RpcClient {

protected static final String NACOS_SERVER_GRPC_PORT_OFFSET_KEY = "nacos.server.grpc.port.offset";

protected static final String NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME = "nacos.remote.client.grpc.pool.alive";

protected static final String NACOS_CLIENT_GRPC_TIMEOUT = "nacos.remote.client.grpc.timeout";

protected static final String NACOS_CLIENT_GRPC_QUEUESIZE = "nacos.remote.client.grpc.queue.size";

private ThreadPoolExecutor grpcExecutor = null;

private Integer threadPoolCoreSize;
Expand All @@ -70,6 +78,16 @@ public abstract class GrpcClient extends RpcClient {

private static final long DEFAULT_KEEP_ALIVE_TIME = 6 * 60 * 1000;

private Properties configProperties = new Properties();

private static final String DEFAULT_TIME_OUT = "3000";

private static final String QUEUE_SIZE = "10000";

private static final String KEEP_ALIVE = "10";

private Long timeOut;

@Override
public ConnectionType getConnectionType() {
return ConnectionType.GRPC;
Expand All @@ -80,6 +98,45 @@ public ConnectionType getConnectionType() {
*/
public GrpcClient(String name) {
super(name);
initGrpcClient(null);
}

public GrpcClient(String name, Properties configProperties) {
super(name);
initGrpcClient(configProperties);
}

private void initGrpcClient(Properties configProperties) {
if (!Objects.isNull(configProperties)) {
if (configProperties.contains(NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME)) {
this.configProperties.put(NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME,
configProperties.getProperty(NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME));
}
if (configProperties.contains(NACOS_CLIENT_GRPC_TIMEOUT)) {
this.configProperties.put(NACOS_CLIENT_GRPC_TIMEOUT,
configProperties.getProperty(NACOS_CLIENT_GRPC_TIMEOUT));
}
if (configProperties.contains(NACOS_CLIENT_GRPC_QUEUESIZE)) {
this.configProperties.put(NACOS_CLIENT_GRPC_QUEUESIZE,
configProperties.getProperty(NACOS_CLIENT_GRPC_QUEUESIZE));
}
}
checkInitProperties(this.configProperties);
}

private void addDefaultConfig(Properties configProperties, String name, String defaultConfig) {
if (null != System.getProperty(name)) {
configProperties.put(name, System.getProperty(name));
} else if (null == configProperties.getProperty(name)) {
configProperties.put(name, defaultConfig);
}
}

private void checkInitProperties(Properties configProperties) {
addDefaultConfig(configProperties, NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME, KEEP_ALIVE);
addDefaultConfig(configProperties, NACOS_CLIENT_GRPC_TIMEOUT, DEFAULT_TIME_OUT);
addDefaultConfig(configProperties, NACOS_CLIENT_GRPC_QUEUESIZE, QUEUE_SIZE);
this.timeOut = Long.parseLong(configProperties.getProperty(NACOS_CLIENT_GRPC_TIMEOUT));
}

/**
Expand Down Expand Up @@ -109,14 +166,12 @@ protected Integer getThreadPoolMaxSize() {
}

protected ThreadPoolExecutor createGrpcExecutor(String serverIp) {
ThreadPoolExecutor grpcExecutor = new ThreadPoolExecutor(
getThreadPoolCoreSize(),
getThreadPoolMaxSize(),
10L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10000),
new ThreadFactoryBuilder()
.daemon(true)
.nameFormat("nacos-grpc-client-executor-" + serverIp + "-%d")
Long keepAliveTime = Long.parseLong(
this.configProperties.getProperty(NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME));
int queueSize = Integer.parseInt(this.configProperties.getProperty(NACOS_CLIENT_GRPC_QUEUESIZE));
ThreadPoolExecutor grpcExecutor = new ThreadPoolExecutor(getThreadPoolCoreSize(), getThreadPoolMaxSize(),
keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<>(queueSize),
new ThreadFactoryBuilder().daemon(true).nameFormat("nacos-grpc-client-executor-" + serverIp + "-%d")
.build());
grpcExecutor.allowCoreThreadTimeOut(true);
return grpcExecutor;
Expand All @@ -130,7 +185,7 @@ public void shutdown() throws NacosException {
grpcExecutor.shutdown();
}
}

/**
* Create a stub using a channel.
*
Expand All @@ -140,7 +195,7 @@ public void shutdown() throws NacosException {
private RequestGrpc.RequestFutureStub createNewChannelStub(ManagedChannel managedChannelTemp) {
return RequestGrpc.newFutureStub(managedChannelTemp);
}

/**
* create a new channel with specific server address.
*
Expand All @@ -149,9 +204,11 @@ private RequestGrpc.RequestFutureStub createNewChannelStub(ManagedChannel manage
* @return if server check success,return a non-null channel.
*/
private ManagedChannel createNewManagedChannel(String serverIp, int serverPort) {
ManagedChannelBuilder<?> managedChannelBuilder = ManagedChannelBuilder.forAddress(serverIp, serverPort).executor(grpcExecutor)
.compressorRegistry(CompressorRegistry.getDefaultInstance()).decompressorRegistry(DecompressorRegistry.getDefaultInstance())
.maxInboundMessageSize(getInboundMessageSize()).keepAliveTime(keepAliveTimeMillis(), TimeUnit.MILLISECONDS).usePlaintext();
ManagedChannelBuilder<?> managedChannelBuilder = ManagedChannelBuilder.forAddress(serverIp, serverPort)
.executor(grpcExecutor).compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
.maxInboundMessageSize(getInboundMessageSize())
.keepAliveTime(keepAliveTimeMillis(), TimeUnit.MILLISECONDS).usePlaintext();
return managedChannelBuilder.build();
}

Expand All @@ -162,8 +219,8 @@ private int getInboundMessageSize() {
}

private int keepAliveTimeMillis() {
String keepAliveTimeMillis = System
.getProperty("nacos.remote.grpc.keep.alive.millis", String.valueOf(DEFAULT_KEEP_ALIVE_TIME));
String keepAliveTimeMillis = System.getProperty("nacos.remote.grpc.keep.alive.millis",
String.valueOf(DEFAULT_KEEP_ALIVE_TIME));
return Integer.parseInt(keepAliveTimeMillis);
}

Expand Down Expand Up @@ -192,7 +249,7 @@ private Response serverCheck(String ip, int port, RequestGrpc.RequestFutureStub
ServerCheckRequest serverCheckRequest = new ServerCheckRequest();
Payload grpcRequest = GrpcUtils.convert(serverCheckRequest);
ListenableFuture<Payload> responseFuture = requestBlockingStub.request(grpcRequest);
Payload response = responseFuture.get(3000L, TimeUnit.MILLISECONDS);
Payload response = responseFuture.get(timeOut, TimeUnit.MILLISECONDS);
//receive connection unregister response here,not check response is success.
return (Response) GrpcUtils.parse(response);
} catch (Exception e) {
Expand Down Expand Up @@ -230,8 +287,8 @@ public void onNext(Payload payload) {
} catch (Exception e) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Handle server request exception: {}",
grpcConn.getConnectionId(), payload.toString(), e.getMessage());
Response errResponse = ErrorResponse
.build(NacosException.CLIENT_ERROR, "Handle server request error");
Response errResponse = ErrorResponse.build(NacosException.CLIENT_ERROR,
"Handle server request error");
errResponse.setRequestId(request.getRequestId());
sendResponse(errResponse);
}
Expand Down Expand Up @@ -309,8 +366,8 @@ public Connection connectToServer(ServerInfo serverInfo) {
return null;
}

BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc
.newStub(newChannelStubTemp.getChannel());
BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub(
newChannelStubTemp.getChannel());
GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,54 +28,60 @@

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Properties;

import static org.mockito.Mockito.spy;

@RunWith(MockitoJUnitRunner.class)
public class GrpcClientTest {

GrpcClient grpcClient;

Method createNewManagedChannelMethod;

Method createNewChannelStubMethod;

ManagedChannel managedChannel;

RpcClient.ServerInfo serverInfo;

@Before
public void setUp() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
grpcClient = spy(new GrpcClient("testClient") {
Properties clientConfig = new Properties();
clientConfig.put("nacos.remote.client.grpc.pool.alive", String.valueOf(20));
clientConfig.put("nacos.remote.client.grpc.timeout", String.valueOf(5000));
grpcClient = spy(new GrpcClient("testClient", clientConfig) {
@Override
public int rpcPortOffset() {
return 1000;
}
});
RpcClient.ServerInfo serverInfo = spy(new RpcClient.ServerInfo("10.10.10.10", 8848));
createNewManagedChannelMethod = GrpcClient.class.getDeclaredMethod("createNewManagedChannel", String.class, int.class);
createNewManagedChannelMethod = GrpcClient.class.getDeclaredMethod("createNewManagedChannel", String.class,
int.class);
createNewManagedChannelMethod.setAccessible(true);
int port = serverInfo.getServerPort() + grpcClient.rpcPortOffset();
managedChannel = (ManagedChannel) createNewManagedChannelMethod.invoke(grpcClient, serverInfo.getServerIp(), port);
managedChannel = (ManagedChannel) createNewManagedChannelMethod.invoke(grpcClient, serverInfo.getServerIp(),
port);
}

@Test
public void testCreateNewManagedChannel() throws InvocationTargetException, IllegalAccessException {
GrpcConnection grpcConnection = new GrpcConnection(serverInfo, null);
grpcConnection.setChannel(managedChannel);
}

@Test
public void createNewChannelStub() throws InvocationTargetException, IllegalAccessException, NoSuchMethodException {
createNewChannelStubMethod = GrpcClient.class.getDeclaredMethod("createNewChannelStub", ManagedChannel.class);
createNewChannelStubMethod.setAccessible(true);
Object invoke = createNewChannelStubMethod.invoke(grpcClient, managedChannel);
Assert.assertTrue(invoke instanceof RequestGrpc.RequestFutureStub);
}

@After
public void close() {
managedChannel.shutdownNow();
}

}

0 comments on commit 9819820

Please sign in to comment.