Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable for GrpcClient health check retry times #9056

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ public abstract class RpcClient implements Closeable {

private static final Pattern EXCLUDE_PROTOCOL_PATTERN = Pattern.compile("(?<=\\w{1,5}://)(.*)");

protected int healthCheckRetryTimes = 1;

protected Long healthCheckTimeOut = 3000L;

static {
PayloadRegistry.init();
}
Expand Down Expand Up @@ -309,8 +313,7 @@ public final void start() throws NacosException {
if (isShutdown()) {
break;
}
ReconnectContext reconnectContext = reconnectionSignal
.poll(keepAliveTime, TimeUnit.MILLISECONDS);
ReconnectContext reconnectContext = reconnectionSignal.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (reconnectContext == null) {
// check alive time.
if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
Expand All @@ -328,8 +331,8 @@ public final void start() throws NacosException {
break;
}

boolean statusFLowSuccess = RpcClient.this.rpcClientStatus
.compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
boolean statusFLowSuccess = RpcClient.this.rpcClientStatus.compareAndSet(
rpcClientStatus, RpcClientStatus.UNHEALTHY);
if (statusFLowSuccess) {
reconnectContext = new ReconnectContext(null, false);
} else {
Expand Down Expand Up @@ -396,8 +399,9 @@ public final void start() throws NacosException {
}

if (connectToServer != null) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}",
name, connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId());
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Success to connect to server [{}] on start up, connectionId = {}", name,
connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId());
this.currentConnection = connectToServer;
rpcClientStatus.set(RpcClientStatus.RUNNING);
eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
Expand Down Expand Up @@ -431,8 +435,8 @@ public Response requestReply(Request request) {
ConnectResetRequest connectResetRequest = (ConnectResetRequest) request;
if (StringUtils.isNotBlank(connectResetRequest.getServerIp())) {
ServerInfo serverInfo = resolveServerInfo(
connectResetRequest.getServerIp() + Constants.COLON + connectResetRequest
.getServerPort());
connectResetRequest.getServerIp() + Constants.COLON
+ connectResetRequest.getServerPort());
switchServerAsync(serverInfo, false);
} else {
switchServerAsync();
Expand Down Expand Up @@ -464,12 +468,16 @@ private boolean healthCheck() {
if (this.currentConnection == null) {
return false;
}
try {
Response response = this.currentConnection.request(healthCheckRequest, 3000L);
// not only check server is ok, also check connection is register.
return response != null && response.isSuccess();
} catch (NacosException e) {
// ignore
int reTryTimes = healthCheckRetryTimes;
while (reTryTimes > 0) {
reTryTimes--;
try {
Response response = this.currentConnection.request(healthCheckRequest, healthCheckTimeOut);
// not only check server is ok, also check connection is register.
return response != null && response.isSuccess();
} catch (NacosException e) {
// ignore
}
}
return false;
}
Expand Down Expand Up @@ -520,8 +528,9 @@ protected void reconnect(final ServerInfo recommendServerInfo, boolean onRequest
// 2.create a new channel to new server
Connection connectionNew = connectToServer(serverInfo);
if (connectionNew != null) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect a server [{}], connectionId = {}",
name, serverInfo.getAddress(), connectionNew.getConnectionId());
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Success to connect a server [{}], connectionId = {}", name,
serverInfo.getAddress(), connectionNew.getConnectionId());
// successfully create a new connect.
if (currentConnection != null) {
LoggerUtils.printIfInfoEnabled(LOGGER,
Expand Down Expand Up @@ -558,8 +567,8 @@ protected void reconnect(final ServerInfo recommendServerInfo, boolean onRequest
if (reConnectTimes > 0
&& reConnectTimes % RpcClient.this.serverListFactory.getServerList().size() == 0) {
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Fail to connect server, after trying {} times, last try server is {}, error = {}", name,
reConnectTimes, serverInfo, lastException == null ? "unknown" : lastException);
"[{}] Fail to connect server, after trying {} times, last try server is {}, error = {}",
name, reConnectTimes, serverInfo, lastException == null ? "unknown" : lastException);
if (Integer.MAX_VALUE == retryTurns) {
retryTurns = 50;
} else {
Expand Down Expand Up @@ -687,8 +696,9 @@ public Response request(Request request, long timeoutMills) throws NacosExceptio
}
}

LoggerUtils.printIfErrorEnabled(LOGGER, "Send request fail, request = {}, retryTimes = {}, errorMessage = {}",
request, retryTimes, e.getMessage());
LoggerUtils.printIfErrorEnabled(LOGGER,
"Send request fail, request = {}, retryTimes = {}, errorMessage = {}", request, retryTimes,
e.getMessage());

exceptionThrow = e;

Expand Down Expand Up @@ -737,9 +747,9 @@ public void asyncRequest(Request request, RequestCallBack callback) throws Nacos
// Do nothing.
}
}
LoggerUtils
.printIfErrorEnabled(LOGGER, "[{}] Send request fail, request = {}, retryTimes = {}, errorMessage = {}",
name, request, retryTimes, e.getMessage());
LoggerUtils.printIfErrorEnabled(LOGGER,
"[{}] Send request fail, request = {}, retryTimes = {}, errorMessage = {}", name, request,
retryTimes, e.getMessage());
exceptionToThrow = e;

}
Expand Down Expand Up @@ -785,9 +795,9 @@ public RequestFuture requestFuture(Request request) throws NacosException {
// Do nothing.
}
}
LoggerUtils
.printIfErrorEnabled(LOGGER, "[{}] Send request fail, request = {}, retryTimes = {}, errorMessage = {}",
name, request, retryTimes, e.getMessage());
LoggerUtils.printIfErrorEnabled(LOGGER,
"[{}] Send request fail, request = {}, retryTimes = {}, errorMessage = {}", name, request,
retryTimes, e.getMessage());
exceptionToThrow = e;

}
Expand Down Expand Up @@ -832,8 +842,8 @@ protected Response handleServerRequest(final Request request) {
Response response = serverRequestHandler.requestReply(request);

if (response != null) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Ack server push request, request = {}, requestId = {}", name,
request.getClass().getSimpleName(), request.getRequestId());
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Ack server push request, request = {}, requestId = {}",
name, request.getClass().getSimpleName(), request.getRequestId());
return response;
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -60,23 +61,15 @@ public abstract class GrpcClient extends RpcClient {

static final Logger LOGGER = LoggerFactory.getLogger(GrpcClient.class);

protected static final String NACOS_SERVER_GRPC_PORT_OFFSET_KEY = "nacos.server.grpc.port.offset";

protected static final String NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME = "nacos.remote.client.grpc.pool.alive";

protected static final String NACOS_CLIENT_GRPC_TIMEOUT = "nacos.remote.client.grpc.timeout";

protected static final String NACOS_CLIENT_GRPC_QUEUESIZE = "nacos.remote.client.grpc.queue.size";

private ThreadPoolExecutor grpcExecutor = null;

private Integer threadPoolCoreSize;

private Integer threadPoolMaxSize;

private static final long DEFAULT_MAX_INBOUND_MESSAGE_SIZE = 10 * 1024 * 1024L;
private static final String DEFAULT_MAX_INBOUND_MESSAGE_SIZE = String.valueOf(10 * 1024 * 1024L);

private static final long DEFAULT_KEEP_ALIVE_TIME = 6 * 60 * 1000;
private static final String DEFAULT_KEEP_ALIVE_TIME = String.valueOf(6 * 60 * 1000);

private Properties configProperties = new Properties();

Expand All @@ -86,7 +79,7 @@ public abstract class GrpcClient extends RpcClient {

private static final String KEEP_ALIVE = "10";

private Long timeOut;
private long timeOut = 3000L;

@Override
public ConnectionType getConnectionType() {
Expand All @@ -98,32 +91,34 @@ public ConnectionType getConnectionType() {
*/
public GrpcClient(String name) {
super(name);
initGrpcClient(null);
initGrpcClient(null, null);
}

public GrpcClient(String name, Properties configProperties) {
super(name);
initGrpcClient(configProperties);
initGrpcClient(configProperties, GrpcConsts.getRpcParams());
}

private void initGrpcClient(Properties configProperties) {
private void initGrpcClient(Properties configProperties, Set<String> configName) {
if (!Objects.isNull(configProperties)) {
if (configProperties.contains(NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME)) {
this.configProperties.put(NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME,
configProperties.getProperty(NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME));
}
if (configProperties.contains(NACOS_CLIENT_GRPC_TIMEOUT)) {
this.configProperties.put(NACOS_CLIENT_GRPC_TIMEOUT,
configProperties.getProperty(NACOS_CLIENT_GRPC_TIMEOUT));
}
if (configProperties.contains(NACOS_CLIENT_GRPC_QUEUESIZE)) {
this.configProperties.put(NACOS_CLIENT_GRPC_QUEUESIZE,
configProperties.getProperty(NACOS_CLIENT_GRPC_QUEUESIZE));
for (String name : configName) {
if (configProperties.contains(name)) {
this.configProperties.put(name, configProperties.getProperty(name));
}
}
}
addDefaultConfig();
checkInitProperties(this.configProperties);
}

private void addDefaultConfig() {
addDefaultConfig(configProperties, GrpcConsts.NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME, KEEP_ALIVE);
addDefaultConfig(configProperties, GrpcConsts.NACOS_CLIENT_GRPC_TIMEOUT, DEFAULT_TIME_OUT);
addDefaultConfig(configProperties, GrpcConsts.NACOS_CLIENT_GRPC_QUEUESIZE, QUEUE_SIZE);
addDefaultConfig(configProperties, GrpcConsts.MAX_INBOUND_MESSAGE_SIZE, DEFAULT_MAX_INBOUND_MESSAGE_SIZE);
addDefaultConfig(configProperties, GrpcConsts.KEEP_ALIVE_TIME, DEFAULT_KEEP_ALIVE_TIME);
}

private void addDefaultConfig(Properties configProperties, String name, String defaultConfig) {
if (null != System.getProperty(name)) {
configProperties.put(name, System.getProperty(name));
Expand All @@ -133,10 +128,17 @@ private void addDefaultConfig(Properties configProperties, String name, String d
}

private void checkInitProperties(Properties configProperties) {
addDefaultConfig(configProperties, NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME, KEEP_ALIVE);
addDefaultConfig(configProperties, NACOS_CLIENT_GRPC_TIMEOUT, DEFAULT_TIME_OUT);
addDefaultConfig(configProperties, NACOS_CLIENT_GRPC_QUEUESIZE, QUEUE_SIZE);
this.timeOut = Long.parseLong(configProperties.getProperty(NACOS_CLIENT_GRPC_TIMEOUT));
if (configProperties.contains(GrpcConsts.NACOS_CLIENT_GRPC_TIMEOUT)) {
this.timeOut = Long.parseLong(configProperties.getProperty(GrpcConsts.NACOS_CLIENT_GRPC_TIMEOUT));
}
if (configProperties.contains(GrpcConsts.NACOS_CLIENT_GRPC_HEALTHCHECK_RETRY_TIMES)) {
this.healthCheckRetryTimes = Integer.parseInt(
configProperties.getProperty(GrpcConsts.NACOS_CLIENT_GRPC_HEALTHCHECK_RETRY_TIMES));
}
if (configProperties.contains(GrpcConsts.NACOS_CLIENT_GRPC_HEALTHCHECK_TIMEOUT)) {
this.healthCheckTimeOut = Long.parseLong(
configProperties.getProperty(GrpcConsts.NACOS_CLIENT_GRPC_HEALTHCHECK_TIMEOUT));
}
}

/**
Expand Down Expand Up @@ -167,8 +169,8 @@ protected Integer getThreadPoolMaxSize() {

protected ThreadPoolExecutor createGrpcExecutor(String serverIp) {
Long keepAliveTime = Long.parseLong(
this.configProperties.getProperty(NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME));
int queueSize = Integer.parseInt(this.configProperties.getProperty(NACOS_CLIENT_GRPC_QUEUESIZE));
this.configProperties.getProperty(GrpcConsts.NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME));
int queueSize = Integer.parseInt(this.configProperties.getProperty(GrpcConsts.NACOS_CLIENT_GRPC_QUEUESIZE));
ThreadPoolExecutor grpcExecutor = new ThreadPoolExecutor(getThreadPoolCoreSize(), getThreadPoolMaxSize(),
keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<>(queueSize),
new ThreadFactoryBuilder().daemon(true).nameFormat("nacos-grpc-client-executor-" + serverIp + "-%d")
Expand Down Expand Up @@ -213,15 +215,11 @@ private ManagedChannel createNewManagedChannel(String serverIp, int serverPort)
}

private int getInboundMessageSize() {
String messageSize = System.getProperty("nacos.remote.client.grpc.maxinbound.message.size",
String.valueOf(DEFAULT_MAX_INBOUND_MESSAGE_SIZE));
return Integer.parseInt(messageSize);
return Integer.parseInt(configProperties.getProperty(GrpcConsts.MAX_INBOUND_MESSAGE_SIZE));
}

private int keepAliveTimeMillis() {
String keepAliveTimeMillis = System.getProperty("nacos.remote.grpc.keep.alive.millis",
String.valueOf(DEFAULT_KEEP_ALIVE_TIME));
return Integer.parseInt(keepAliveTimeMillis);
return Integer.parseInt(configProperties.getProperty(GrpcConsts.KEEP_ALIVE_TIME));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public GrpcClusterClient(String name) {

@Override
public int rpcPortOffset() {
return Integer.parseInt(System.getProperty(NACOS_SERVER_GRPC_PORT_OFFSET_KEY,
return Integer.parseInt(System.getProperty(GrpcConsts.NACOS_SERVER_GRPC_PORT_OFFSET_KEY,
String.valueOf(Constants.CLUSTER_GRPC_PORT_DEFAULT_OFFSET)));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.common.remote.client.grpc;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/**
* GrpcConsts.
*
* @author karsonto
*/
public class GrpcConsts {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consts --> Constants
I think full name is more readable.


public static final String NACOS_SERVER_GRPC_PORT_OFFSET_KEY = "nacos.server.grpc.port.offset";

public static final String NACOS_CLIENT_GRPC = "nacos.remote.client.grpc";

@GRpcConfigLabel
public static final String NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME = NACOS_CLIENT_GRPC + ".pool.alive";

@GRpcConfigLabel
public static final String NACOS_CLIENT_GRPC_TIMEOUT = NACOS_CLIENT_GRPC + ".timeout";

@GRpcConfigLabel
public static final String NACOS_CLIENT_GRPC_QUEUESIZE = NACOS_CLIENT_GRPC + ".queue.size";

@GRpcConfigLabel
public static final String NACOS_CLIENT_GRPC_HEALTHCHECK_RETRY_TIMES = NACOS_CLIENT_GRPC + ".health.retry";

@GRpcConfigLabel
public static final String NACOS_CLIENT_GRPC_HEALTHCHECK_TIMEOUT = NACOS_CLIENT_GRPC + ".health.timeout";

@GRpcConfigLabel
public static final String MAX_INBOUND_MESSAGE_SIZE = NACOS_CLIENT_GRPC + ".maxinbound.message.size";

@GRpcConfigLabel
public static final String KEEP_ALIVE_TIME = NACOS_CLIENT_GRPC + ".keep.alive.millis";

private static final Set<String> CONFIG_NAMES = new HashSet<>();

@Documented
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
protected @interface GRpcConfigLabel {

}

static {
Class clazz = GrpcConsts.class;
Field[] declaredFields = clazz.getDeclaredFields();
for (Field declaredField : declaredFields) {
declaredField.setAccessible(true);
if (declaredField.getType().equals(String.class) && null != declaredField.getAnnotation(
GRpcConfigLabel.class)) {
try {
CONFIG_NAMES.add((String) declaredField.get(null));
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
}

public static Set<String> getRpcParams() {
return Collections.unmodifiableSet(CONFIG_NAMES);
}
}
Loading