diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java index 923acc99741..9b66badba39 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java @@ -33,18 +33,18 @@ import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder; import com.alibaba.nacos.common.packagescan.resource.Resource; import com.alibaba.nacos.common.remote.ConnectionType; -import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig; -import com.alibaba.nacos.common.remote.client.RpcClientStatus; +import com.alibaba.nacos.common.remote.client.Connection; import com.alibaba.nacos.common.remote.client.RpcClient; +import com.alibaba.nacos.common.remote.client.RpcClientStatus; +import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig; import com.alibaba.nacos.common.remote.client.ServerListFactory; -import com.alibaba.nacos.common.remote.client.Connection; import com.alibaba.nacos.common.remote.client.ServerRequestHandler; import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.common.utils.LoggerUtils; import com.alibaba.nacos.common.utils.StringUtils; -import com.alibaba.nacos.common.utils.VersionUtils; -import com.alibaba.nacos.common.utils.TlsTypeResolve; import com.alibaba.nacos.common.utils.ThreadFactoryBuilder; +import com.alibaba.nacos.common.utils.TlsTypeResolve; +import com.alibaba.nacos.common.utils.VersionUtils; import com.google.common.util.concurrent.ListenableFuture; import io.grpc.CompressorRegistry; import io.grpc.DecompressorRegistry; @@ -54,16 +54,15 @@ import io.grpc.netty.shaded.io.grpc.netty.NegotiationType; import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; - import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder; import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.Map; import java.util.Optional; -import java.util.Arrays; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; @@ -356,56 +355,59 @@ public Connection connectToServer(ServerInfo serverInfo) { int port = serverInfo.getServerPort() + rpcPortOffset(); ManagedChannel managedChannel = createNewManagedChannel(serverInfo.getServerIp(), port); RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(managedChannel); - if (newChannelStubTemp != null) { - - Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp); - if (response == null || !(response instanceof ServerCheckResponse)) { - shuntDownChannel(managedChannel); + + Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp); + if (!(response instanceof ServerCheckResponse)) { + shuntDownChannel(managedChannel); + return null; + } + // submit ability table as soon as possible + // ability table will be null if server doesn't support ability table + ServerCheckResponse serverCheckResponse = (ServerCheckResponse) response; + connectionId = serverCheckResponse.getConnectionId(); + + BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub( + newChannelStubTemp.getChannel()); + GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor); + grpcConn.setConnectionId(connectionId); + // if not supported, it will be false + if (serverCheckResponse.isSupportAbilityNegotiation()) { + // mark + this.recAbilityContext.reset(grpcConn); + } + + //create stream request and bind connection event to this connection. + StreamObserver payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn); + + // stream observer to send response to server + grpcConn.setPayloadStreamObserver(payloadStreamObserver); + grpcConn.setGrpcFutureServiceStub(newChannelStubTemp); + grpcConn.setChannel(managedChannel); + //send a setup request. + ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest(); + conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion()); + conSetupRequest.setLabels(super.getLabels()); + // set ability table + conSetupRequest.setAbilityTable( + NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities(abilityMode())); + conSetupRequest.setTenant(super.getTenant()); + grpcConn.sendRequest(conSetupRequest); + // wait for response + if (recAbilityContext.isNeedToSync()) { + // try to wait for notify response + boolean waitForResponse = recAbilityContext.await(this.clientConfig.capabilityNegotiationTimeout(), + TimeUnit.MILLISECONDS); + if (!waitForResponse) { + // haven't received a response for registration; need to register again. return null; } - - // submit ability table as soon as possible - // ability table will be null if server doesn't support ability table - ServerCheckResponse serverCheckResponse = (ServerCheckResponse) response; - connectionId = serverCheckResponse.getConnectionId(); - - BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub( - newChannelStubTemp.getChannel()); - GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor); - grpcConn.setConnectionId(connectionId); - // if not supported, it will be false - if (serverCheckResponse.isSupportAbilityNegotiation()) { - // mark - this.recAbilityContext.reset(grpcConn); - } - - //create stream request and bind connection event to this connection. - StreamObserver payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn); - - // stream observer to send response to server - grpcConn.setPayloadStreamObserver(payloadStreamObserver); - grpcConn.setGrpcFutureServiceStub(newChannelStubTemp); - grpcConn.setChannel(managedChannel); - //send a setup request. - ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest(); - conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion()); - conSetupRequest.setLabels(super.getLabels()); - // set ability table - conSetupRequest.setAbilityTable(NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities(abilityMode())); - conSetupRequest.setTenant(super.getTenant()); - grpcConn.sendRequest(conSetupRequest); - // wait for response - if (recAbilityContext.isNeedToSync()) { - // try to wait for notify response - recAbilityContext.await(this.clientConfig.capabilityNegotiationTimeout(), TimeUnit.MILLISECONDS); - } else { - // leave for adapting old version server - // wait to register connection setup - Thread.sleep(100L); - } - return grpcConn; + } else { + // leave for adapting old version server + // registration is considered successful by default after 100ms + // wait to register connection setup + Thread.sleep(100L); } - return null; + return grpcConn; } catch (Exception e) { LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e); // remove and notify @@ -425,17 +427,17 @@ public Connection connectToServer(ServerInfo serverInfo) { protected void afterReset(ConnectResetRequest request) { recAbilityContext.release(null); } - + /** * This is for receiving server abilities. */ - class RecAbilityContext { - + static class RecAbilityContext { + /** * connection waiting for server abilities. */ private volatile Connection connection; - + /** * way to block client. */ @@ -484,19 +486,24 @@ public void release(Map abilities) { } this.needToSync = false; } - + /** - * await for abilities. + * Wait for a specified duration for a condition to be met. * - * @param timeout timeout. - * @param unit unit. - * @throws InterruptedException by blocker. + * @param timeout The maximum time to wait. + * @param unit The time unit for the timeout. + * @return true if the condition was successfully awaited, false otherwise. + * @throws InterruptedException if the waiting thread is interrupted. */ - public void await(long timeout, TimeUnit unit) throws InterruptedException { - if (this.blocker != null) { - this.blocker.await(timeout, unit); + public boolean await(long timeout, TimeUnit unit) throws InterruptedException { + if (blocker != null) { + boolean waitForResponse = blocker.await(timeout, unit); + if (waitForResponse) { + needToSync = false; + } + return waitForResponse; } - this.needToSync = false; + return false; } }