From a0ac6b3c193e08cba82cdd11a39fc074b63d0747 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Tue, 16 Jan 2024 14:42:44 -0800 Subject: [PATCH] change controlPlaneClient and loadReportClient --- .../java/io/grpc/xds/ControlPlaneClient.java | 264 +++++++----------- .../io/grpc/xds/GrpcXdsTransportFactory.java | 16 +- .../java/io/grpc/xds/LoadReportClient.java | 127 ++++----- .../grpc/xds/SharedXdsClientPoolProvider.java | 4 +- .../main/java/io/grpc/xds/XdsClientImpl.java | 29 +- .../io/grpc/xds/LoadReportClientTest.java | 4 +- .../io/grpc/xds/XdsClientFederationTest.java | 4 +- .../io/grpc/xds/XdsClientImplTestBase.java | 18 +- 8 files changed, 198 insertions(+), 268 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java b/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java index 5a344336ba2..d733246cb5c 100644 --- a/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java +++ b/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java @@ -28,23 +28,22 @@ import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; -import io.grpc.Channel; import io.grpc.Context; import io.grpc.InternalLogId; -import io.grpc.ManagedChannel; +import io.grpc.MethodDescriptor; import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.internal.BackoffPolicy; -import io.grpc.stub.ClientCallStreamObserver; -import io.grpc.stub.ClientResponseObserver; import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.EnvoyProtoData.Node; import io.grpc.xds.XdsClient.ProcessingTracker; import io.grpc.xds.XdsClient.ResourceStore; import io.grpc.xds.XdsClient.XdsResponseHandler; -import io.grpc.xds.XdsClientImpl.XdsChannelFactory; import io.grpc.xds.XdsLogger.XdsLogLevel; +import io.grpc.xds.XdsTransportFactory.EventHandler; +import io.grpc.xds.XdsTransportFactory.StreamingCall; +import io.grpc.xds.XdsTransportFactory.XdsTransport; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -67,7 +66,7 @@ final class ControlPlaneClient { private final InternalLogId logId; private final XdsLogger logger; private final ServerInfo serverInfo; - private final ManagedChannel channel; + private final XdsTransport xdsTransport; private final XdsResponseHandler xdsResponseHandler; private final ResourceStore resourceStore; private final Context context; @@ -84,7 +83,7 @@ final class ControlPlaneClient { private boolean shutdown; @Nullable - private AbstractAdsStream adsStream; + private AdsStreamState adsStreamState; @Nullable private BackoffPolicy retryBackoffPolicy; @Nullable @@ -93,7 +92,7 @@ final class ControlPlaneClient { /** An entity that manages ADS RPCs over a single channel. */ // TODO: rename to XdsChannel ControlPlaneClient( - XdsChannelFactory xdsChannelFactory, + XdsTransport xdsTransport, ServerInfo serverInfo, Node bootstrapNode, XdsResponseHandler xdsResponseHandler, @@ -106,7 +105,7 @@ final class ControlPlaneClient { Supplier stopwatchSupplier, XdsClient.TimerLaunch timerLaunch) { this.serverInfo = checkNotNull(serverInfo, "serverInfo"); - this.channel = checkNotNull(xdsChannelFactory, "xdsChannelFactory").create(serverInfo); + this.xdsTransport = checkNotNull(xdsTransport, "xdsTransport"); this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler"); this.resourceStore = checkNotNull(resourceStore, "resourcesSubscriber"); this.bootstrapNode = checkNotNull(bootstrapNode, "bootstrapNode"); @@ -121,25 +120,19 @@ final class ControlPlaneClient { logger.log(XdsLogLevel.INFO, "Created"); } - /** The underlying channel. */ - // Currently, only externally used for LrsClient. - Channel channel() { - return channel; - } - void shutdown() { syncContext.execute(new Runnable() { @Override public void run() { shutdown = true; logger.log(XdsLogLevel.INFO, "Shutting down"); - if (adsStream != null) { - adsStream.close(Status.CANCELLED.withDescription("shutdown").asException()); + if (adsStreamState != null) { + adsStreamState.close(Status.CANCELLED.withDescription("shutdown").asException()); } if (rpcRetryTimer != null && rpcRetryTimer.isPending()) { rpcRetryTimer.cancel(); } - channel.shutdown(); + xdsTransport.shutdown(); } }); } @@ -157,12 +150,12 @@ void adjustResourceSubscription(XdsResourceType resourceType) { if (isInBackoff()) { return; } - if (adsStream == null) { + if (adsStreamState == null) { startRpcStream(); } Collection resources = resourceStore.getSubscribedResources(serverInfo, resourceType); if (resources != null) { - adsStream.sendDiscoveryRequest(resourceType, resources); + adsStreamState.sendDiscoveryRequest(resourceType, resources); } } @@ -179,7 +172,7 @@ void ackResponse(XdsResourceType type, String versionInfo, String nonce) { if (resources == null) { resources = Collections.emptyList(); } - adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, null); + adsStreamState.sendDiscoveryRequest(type, versionInfo, resources, nonce, null); } /** @@ -195,7 +188,7 @@ void nackResponse(XdsResourceType type, String nonce, String errorDetail) { if (resources == null) { resources = Collections.emptyList(); } - adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, errorDetail); + adsStreamState.sendDiscoveryRequest(type, versionInfo, resources, nonce, errorDetail); } /** @@ -207,7 +200,7 @@ boolean isInBackoff() { } boolean isReady() { - return adsStream != null && adsStream.isReady(); + return adsStreamState != null && adsStreamState.call != null && adsStreamState.call.isReady(); } /** @@ -233,11 +226,10 @@ void readyHandler() { */ // Must be synchronized. private void startRpcStream() { - checkState(adsStream == null, "Previous adsStream has not been cleared yet"); - adsStream = new AdsStreamV3(); + checkState(adsStreamState == null, "Previous adsStream has not been cleared yet"); Context prevContext = context.attach(); try { - adsStream.start(); + adsStreamState = new AdsStreamState(); } finally { context.detach(prevContext); } @@ -258,7 +250,7 @@ public void run() { for (XdsResourceType type : subscribedResourceTypes) { Collection resources = resourceStore.getSubscribedResources(serverInfo, type); if (resources != null) { - adsStream.sendDiscoveryRequest(type, resources); + adsStreamState.sendDiscoveryRequest(type, resources); } } xdsResponseHandler.handleStreamRestarted(serverInfo); @@ -271,7 +263,7 @@ XdsResourceType fromTypeUrl(String typeUrl) { return resourceStore.getSubscribedResourceTypesWithTypeUrl().get(typeUrl); } - private abstract class AbstractAdsStream { + private class AdsStreamState { private boolean responseReceived; private boolean closed; // Response nonce for the most recently received discovery responses of each resource type. @@ -281,14 +273,16 @@ private abstract class AbstractAdsStream { // To avoid confusion, client-initiated requests will always use the nonce in // most recently received responses of each resource type. private final Map, String> respNonces = new HashMap<>(); - - abstract void start(); - - abstract void sendError(Exception error); - - abstract boolean isReady(); - - abstract void request(int count); + private final StreamingCall call; + private final EventHandler listener = new AdsStreamEventHandler(); + private final MethodDescriptor methodDescriptor = + AggregatedDiscoveryServiceGrpc.getStreamAggregatedResourcesMethod(); + + private AdsStreamState() { + this.call = xdsTransport.createStreamingCall(methodDescriptor.getFullMethodName(), + methodDescriptor.getRequestMarshaller(), methodDescriptor.getResponseMarshaller()); + call.start(listener); + } /** * Sends a discovery request with the given {@code versionInfo}, {@code nonce} and @@ -296,8 +290,30 @@ private abstract class AbstractAdsStream { * client-initiated discovery requests, use {@link * #sendDiscoveryRequest(XdsResourceType, Collection)}. */ - abstract void sendDiscoveryRequest(XdsResourceType type, String version, - Collection resources, String nonce, @Nullable String errorDetail); + void sendDiscoveryRequest(XdsResourceType type, String versionInfo, + Collection resources, String nonce, + @Nullable String errorDetail) { + DiscoveryRequest.Builder builder = + DiscoveryRequest.newBuilder() + .setVersionInfo(versionInfo) + .setNode(bootstrapNode.toEnvoyProtoNode()) + .addAllResourceNames(resources) + .setTypeUrl(type.typeUrl()) + .setResponseNonce(nonce); + if (errorDetail != null) { + com.google.rpc.Status error = + com.google.rpc.Status.newBuilder() + .setCode(Code.INVALID_ARGUMENT_VALUE) // FIXME(chengyuanzhang): use correct code + .setMessage(errorDetail) + .build(); + builder.setErrorDetail(error); + } + DiscoveryRequest request = builder.build(); + call.sendMessage(request); + if (logger.isLoggable(XdsLogLevel.DEBUG)) { + logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", MessagePrinter.print(request)); + } + } /** * Sends a client-initiated discovery request. @@ -308,6 +324,50 @@ final void sendDiscoveryRequest(XdsResourceType type, Collection reso respNonces.getOrDefault(type, ""), null); } + private final class AdsStreamEventHandler implements EventHandler { + @Override + public void onReady() { + readyHandler(); + } + + @Override + public void onRecvMessage(DiscoveryResponse response) { + syncContext.execute(new Runnable() { + @Override + public void run() { + XdsResourceType type = fromTypeUrl(response.getTypeUrl()); + if (logger.isLoggable(XdsLogLevel.DEBUG)) { + logger.log( + XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type, + MessagePrinter.print(response)); + } + if (type == null) { + logger.log( + XdsLogLevel.WARNING, + "Ignore an unknown type of DiscoveryResponse: {0}", + response.getTypeUrl()); + + call.startRecvMessage(); + return; + } + handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(), + response.getNonce()); + } + }); + } + + @Override + public void onStatusReceived(final Status status) { + syncContext.execute(() -> { + if (status.isOk()) { + handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER)); + } else { + handleRpcStreamClosed(status); + } + }); + } + } + final void handleRpcResponse(XdsResourceType type, String versionInfo, List resources, String nonce) { checkNotNull(type, "type"); @@ -316,20 +376,13 @@ final void handleRpcResponse(XdsResourceType type, String versionInfo, List request(1), syncContext); + ProcessingTracker processingTracker = new ProcessingTracker( + () -> call.startRecvMessage(), syncContext); xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce, processingTracker); processingTracker.onComplete(); } - final void handleRpcError(Throwable t) { - handleRpcStreamClosed(Status.fromThrowable(t)); - } - - final void handleRpcCompleted() { - handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER)); - } - private void handleRpcStreamClosed(Status error) { if (closed) { return; @@ -366,124 +419,13 @@ private void close(Exception error) { } closed = true; cleanUp(); - sendError(error); + call.sendError(error); } private void cleanUp() { - if (adsStream == this) { - adsStream = null; + if (adsStreamState == this) { + adsStreamState = null; } } } - - private final class AdsStreamV3 extends AbstractAdsStream { - private ClientCallStreamObserver requestWriter; - - @Override - public boolean isReady() { - return requestWriter != null && ((ClientCallStreamObserver) requestWriter).isReady(); - } - - @Override - @SuppressWarnings("unchecked") - void start() { - AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub = - AggregatedDiscoveryServiceGrpc.newStub(channel); - - final class AdsClientResponseObserver - implements ClientResponseObserver { - - @Override - public void beforeStart(ClientCallStreamObserver requestStream) { - requestStream.disableAutoRequestWithInitial(1); - requestStream.setOnReadyHandler(ControlPlaneClient.this::readyHandler); - } - - @Override - public void onNext(final DiscoveryResponse response) { - syncContext.execute(new Runnable() { - @Override - public void run() { - XdsResourceType type = fromTypeUrl(response.getTypeUrl()); - if (logger.isLoggable(XdsLogLevel.DEBUG)) { - logger.log( - XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type, - MessagePrinter.print(response)); - } - if (type == null) { - logger.log( - XdsLogLevel.WARNING, - "Ignore an unknown type of DiscoveryResponse: {0}", - response.getTypeUrl()); - request(1); - return; - } - handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(), - response.getNonce()); - } - }); - } - - @Override - public void onError(final Throwable t) { - syncContext.execute(new Runnable() { - @Override - public void run() { - handleRpcError(t); - } - }); - } - - @Override - public void onCompleted() { - syncContext.execute(new Runnable() { - @Override - public void run() { - handleRpcCompleted(); - } - }); - } - } - - requestWriter = (ClientCallStreamObserver) stub.streamAggregatedResources( - new AdsClientResponseObserver()); - } - - @Override - void sendDiscoveryRequest(XdsResourceType type, String versionInfo, - Collection resources, String nonce, - @Nullable String errorDetail) { - checkState(requestWriter != null, "ADS stream has not been started"); - DiscoveryRequest.Builder builder = - DiscoveryRequest.newBuilder() - .setVersionInfo(versionInfo) - .setNode(bootstrapNode.toEnvoyProtoNode()) - .addAllResourceNames(resources) - .setTypeUrl(type.typeUrl()) - .setResponseNonce(nonce); - if (errorDetail != null) { - com.google.rpc.Status error = - com.google.rpc.Status.newBuilder() - .setCode(Code.INVALID_ARGUMENT_VALUE) // FIXME(chengyuanzhang): use correct code - .setMessage(errorDetail) - .build(); - builder.setErrorDetail(error); - } - DiscoveryRequest request = builder.build(); - requestWriter.onNext(request); - if (logger.isLoggable(XdsLogLevel.DEBUG)) { - logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", MessagePrinter.print(request)); - } - } - - @Override - void request(int count) { - requestWriter.request(count); - } - - @Override - void sendError(Exception error) { - requestWriter.onError(error); - } - } } diff --git a/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java b/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java index b63b3cc0d88..66d7e1bd951 100644 --- a/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java +++ b/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java @@ -18,6 +18,7 @@ import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.annotations.VisibleForTesting; import io.grpc.CallOptions; import io.grpc.ChannelCredentials; import io.grpc.ClientCall; @@ -30,14 +31,20 @@ final class GrpcXdsTransportFactory implements XdsTransportFactory { - static final XdsTransportFactory DEFAULT_XDS_TRANSPORT_FACTORY = new GrpcXdsTransportFactory(); + static final GrpcXdsTransportFactory DEFAULT_XDS_TRANSPORT_FACTORY = + new GrpcXdsTransportFactory(); @Override public XdsTransport create(Bootstrapper.ServerInfo serverInfo) { return new GrpcXdsTransport(serverInfo); } - private class GrpcXdsTransport implements XdsTransport { + public XdsTransport createForTest(ManagedChannel channel) { + return new GrpcXdsTransport(channel); + } + + @VisibleForTesting + static class GrpcXdsTransport implements XdsTransport { private final ManagedChannel channel; @@ -49,6 +56,11 @@ public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo) { .build(); } + @VisibleForTesting + public GrpcXdsTransport(ManagedChannel channel) { + this.channel = checkNotNull(channel, "channel"); + } + @Override public StreamingCall createStreamingCall( String fullMethodName, diff --git a/xds/src/main/java/io/grpc/xds/LoadReportClient.java b/xds/src/main/java/io/grpc/xds/LoadReportClient.java index b86c8110f63..a7bead0ae2f 100644 --- a/xds/src/main/java/io/grpc/xds/LoadReportClient.java +++ b/xds/src/main/java/io/grpc/xds/LoadReportClient.java @@ -27,19 +27,21 @@ import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc; import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest; import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse; -import io.grpc.Channel; import io.grpc.Context; import io.grpc.InternalLogId; +import io.grpc.MethodDescriptor; import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.internal.BackoffPolicy; -import io.grpc.stub.StreamObserver; import io.grpc.xds.EnvoyProtoData.Node; import io.grpc.xds.Stats.ClusterStats; import io.grpc.xds.Stats.DroppedRequests; import io.grpc.xds.Stats.UpstreamLocalityStats; import io.grpc.xds.XdsLogger.XdsLogLevel; +import io.grpc.xds.XdsTransportFactory.EventHandler; +import io.grpc.xds.XdsTransportFactory.StreamingCall; +import io.grpc.xds.XdsTransportFactory.XdsTransport; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -55,7 +57,7 @@ final class LoadReportClient { private final InternalLogId logId; private final XdsLogger logger; - private final Channel channel; + private final XdsTransport xdsTransport; private final Context context; private final Node node; private final SynchronizationContext syncContext; @@ -64,7 +66,6 @@ final class LoadReportClient { private final BackoffPolicy.Provider backoffPolicyProvider; @VisibleForTesting final LoadStatsManager2 loadStatsManager; - private boolean started; @Nullable private BackoffPolicy lrsRpcRetryPolicy; @@ -72,11 +73,13 @@ final class LoadReportClient { private ScheduledHandle lrsRpcRetryTimer; @Nullable @VisibleForTesting - LrsStream lrsStream; + LrsStreamStatus lrsStreamStatus; + private static final MethodDescriptor method = + LoadReportingServiceGrpc.getStreamLoadStatsMethod(); LoadReportClient( LoadStatsManager2 loadStatsManager, - Channel channel, + XdsTransport xdsTransport, Context context, Node node, SynchronizationContext syncContext, @@ -84,7 +87,7 @@ final class LoadReportClient { BackoffPolicy.Provider backoffPolicyProvider, Supplier stopwatchSupplier) { this.loadStatsManager = checkNotNull(loadStatsManager, "loadStatsManager"); - this.channel = checkNotNull(channel, "xdsChannel"); + this.xdsTransport = checkNotNull(xdsTransport, "xdsTransport"); this.context = checkNotNull(context, "context"); this.syncContext = checkNotNull(syncContext, "syncContext"); this.timerService = checkNotNull(scheduledExecutorService, "timeService"); @@ -126,17 +129,17 @@ void stopLoadReporting() { if (lrsRpcRetryTimer != null && lrsRpcRetryTimer.isPending()) { lrsRpcRetryTimer.cancel(); } - if (lrsStream != null) { - lrsStream.close(Status.CANCELLED.withDescription("stop load reporting").asException()); + if (lrsStreamStatus != null) { + lrsStreamStatus.close(Status.CANCELLED.withDescription("stop load reporting").asException()); } // Do not shutdown channel as it is not owned by LrsClient. } @VisibleForTesting static class LoadReportingTask implements Runnable { - private final LrsStream stream; + private final LrsStreamStatus stream; - LoadReportingTask(LrsStream stream) { + LoadReportingTask(LrsStreamStatus stream) { this.stream = stream; } @@ -159,65 +162,65 @@ private void startLrsRpc() { if (!started) { return; } - checkState(lrsStream == null, "previous lbStream has not been cleared yet"); - lrsStream = new LrsStream(); + checkState(lrsStreamStatus == null, "previous lbStream has not been cleared yet"); retryStopwatch.reset().start(); Context prevContext = context.attach(); try { - lrsStream.start(); + lrsStreamStatus = new LrsStreamStatus(); } finally { context.detach(prevContext); } } - private final class LrsStream { + private final class LrsStreamStatus { boolean initialResponseReceived; boolean closed; long intervalNano = -1; boolean reportAllClusters; List clusterNames; // clusters to report loads for, if not report all. ScheduledHandle loadReportTimer; - StreamObserver lrsRequestWriterV3; + private final StreamingCall call; + private final EventHandler eventHandler = new LrsEventHandler(); - void start() { - StreamObserver lrsResponseReaderV3 = - new StreamObserver() { - @Override - public void onNext(final LoadStatsResponse response) { - syncContext.execute(new Runnable() { - @Override - public void run() { - logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response); - handleRpcResponse(response.getClustersList(), response.getSendAllClusters(), - Durations.toNanos(response.getLoadReportingInterval())); - } - }); - } + LrsStreamStatus() { + this.call = xdsTransport.createStreamingCall(method.getFullMethodName(), + method.getRequestMarshaller(), method.getResponseMarshaller()); + call.start(eventHandler); + logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request"); + sendLoadStatsRequest(Collections.emptyList()); + call.startRecvMessage(); + } - @Override - public void onError(final Throwable t) { - syncContext.execute(new Runnable() { - @Override - public void run() { - handleRpcError(t); - } - }); - } + private final class LrsEventHandler implements EventHandler { + @Override + public void onReady() {} + + @Override + public void onRecvMessage(LoadStatsResponse response) { + syncContext.execute(new Runnable() { + @Override + public void run() { + logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response); + handleRpcResponse(response.getClustersList(), response.getSendAllClusters(), + Durations.toNanos(response.getLoadReportingInterval())); + call.startRecvMessage(); + } + }); + } - @Override - public void onCompleted() { - syncContext.execute(new Runnable() { - @Override - public void run() { - handleRpcCompleted(); - } - }); + @Override + public void onStatusReceived(final Status status) { + syncContext.execute(new Runnable() { + @Override + public void run() { + if (status.isOk()) { + handleStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server")); + } else { + handleStreamClosed(status); } - }; - lrsRequestWriterV3 = LoadReportingServiceGrpc.newStub(channel).withWaitForReady() - .streamLoadStats(lrsResponseReaderV3); - logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request"); - sendLoadStatsRequest(Collections.emptyList()); + } + }); + } } void sendLoadStatsRequest(List clusterStatsList) { @@ -227,14 +230,10 @@ void sendLoadStatsRequest(List clusterStatsList) { requestBuilder.addClusterStats(buildClusterStats(stats)); } LoadStatsRequest request = requestBuilder.build(); - lrsRequestWriterV3.onNext(request); + call.sendMessage(request); logger.log(XdsLogLevel.DEBUG, "Sent LoadStatsRequest\n{0}", request); } - void sendError(Exception error) { - lrsRequestWriterV3.onError(error); - } - void handleRpcResponse(List clusters, boolean sendAllClusters, long loadReportIntervalNano) { if (closed) { @@ -256,14 +255,6 @@ void handleRpcResponse(List clusters, boolean sendAllClusters, scheduleNextLoadReport(); } - void handleRpcError(Throwable t) { - handleStreamClosed(Status.fromThrowable(t)); - } - - void handleRpcCompleted() { - handleStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server")); - } - private void sendLoadReport() { if (closed) { return; @@ -330,7 +321,7 @@ private void close(Exception error) { } closed = true; cleanUp(); - sendError(error); + call.sendError(error); } private void cleanUp() { @@ -338,8 +329,8 @@ private void cleanUp() { loadReportTimer.cancel(); loadReportTimer = null; } - if (lrsStream == this) { - lrsStream = null; + if (lrsStreamStatus == this) { + lrsStreamStatus = null; } } diff --git a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java index 5aabd976085..c4d5589e57c 100644 --- a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java +++ b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java @@ -17,6 +17,7 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.xds.GrpcXdsTransportFactory.DEFAULT_XDS_TRANSPORT_FACTORY; import com.google.common.annotations.VisibleForTesting; import io.grpc.Context; @@ -26,7 +27,6 @@ import io.grpc.internal.SharedResourceHolder; import io.grpc.internal.TimeProvider; import io.grpc.xds.Bootstrapper.BootstrapInfo; -import io.grpc.xds.XdsClientImpl.XdsChannelFactory; import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory; import io.grpc.xds.internal.security.TlsContextManagerImpl; import java.util.Map; @@ -124,7 +124,7 @@ public XdsClient getObject() { if (refCount == 0) { scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE); xdsClient = new XdsClientImpl( - XdsChannelFactory.DEFAULT_XDS_CHANNEL_FACTORY, + DEFAULT_XDS_TRANSPORT_FACTORY, bootstrapInfo, context, scheduler, diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index 7389d7ebef1..8c1d56a7b79 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -31,12 +31,9 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.Any; -import io.grpc.ChannelCredentials; import io.grpc.Context; -import io.grpc.Grpc; import io.grpc.InternalLogId; import io.grpc.LoadBalancerRegistry; -import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; @@ -100,7 +97,7 @@ public void uncaughtException(Thread t, Throwable e) { private final Map> subscribedResourceTypeUrls = new HashMap<>(); private final Map loadStatsManagerMap = new HashMap<>(); private final Map serverLrsClientMap = new HashMap<>(); - private final XdsChannelFactory xdsChannelFactory; + private final XdsTransportFactory xdsTransportFactory; private final Bootstrapper.BootstrapInfo bootstrapInfo; private final Context context; private final ScheduledExecutorService timeService; @@ -113,7 +110,7 @@ public void uncaughtException(Thread t, Throwable e) { private volatile boolean isShutdown; XdsClientImpl( - XdsChannelFactory xdsChannelFactory, + XdsTransportFactory xdsTransportFactory, Bootstrapper.BootstrapInfo bootstrapInfo, Context context, ScheduledExecutorService timeService, @@ -121,7 +118,7 @@ public void uncaughtException(Thread t, Throwable e) { Supplier stopwatchSupplier, TimeProvider timeProvider, TlsContextManager tlsContextManager) { - this.xdsChannelFactory = xdsChannelFactory; + this.xdsTransportFactory = xdsTransportFactory; this.bootstrapInfo = bootstrapInfo; this.context = context; this.timeService = timeService; @@ -142,8 +139,9 @@ private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) { if (serverChannelMap.containsKey(serverInfo)) { return; } + XdsTransportFactory.XdsTransport xdsTransport = xdsTransportFactory.create(serverInfo); ControlPlaneClient xdsChannel = new ControlPlaneClient( - xdsChannelFactory, + xdsTransport, serverInfo, bootstrapInfo.node(), this, @@ -157,7 +155,7 @@ private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) { LoadStatsManager2 loadStatsManager = new LoadStatsManager2(stopwatchSupplier); loadStatsManagerMap.put(serverInfo, loadStatsManager); LoadReportClient lrsClient = new LoadReportClient( - loadStatsManager, xdsChannel.channel(), context, bootstrapInfo.node(), syncContext, + loadStatsManager, xdsTransport, context, bootstrapInfo.node(), syncContext, timeService, backoffPolicyProvider, stopwatchSupplier); serverChannelMap.put(serverInfo, xdsChannel); serverLrsClientMap.put(serverInfo, lrsClient); @@ -759,19 +757,4 @@ static final class ResourceInvalidException extends Exception { super(cause != null ? message + ": " + cause.getMessage() : message, cause, false, false); } } - - abstract static class XdsChannelFactory { - static final XdsChannelFactory DEFAULT_XDS_CHANNEL_FACTORY = new XdsChannelFactory() { - @Override - ManagedChannel create(ServerInfo serverInfo) { - String target = serverInfo.target(); - ChannelCredentials channelCredentials = serverInfo.channelCredentials(); - return Grpc.newChannelBuilder(target, channelCredentials) - .keepAliveTime(5, TimeUnit.MINUTES) - .build(); - } - }; - - abstract ManagedChannel create(ServerInfo serverInfo); - } } diff --git a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java index 910a9fc3285..49325ea7674 100644 --- a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java +++ b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java @@ -174,7 +174,9 @@ public void cancelled(Context context) { when(backoffPolicy2.nextBackoffNanos()) .thenReturn(TimeUnit.SECONDS.toNanos(2L), TimeUnit.SECONDS.toNanos(20L)); addFakeStatsData(); - lrsClient = new LoadReportClient(loadStatsManager, channel, Context.ROOT, NODE, + lrsClient = new LoadReportClient(loadStatsManager, + GrpcXdsTransportFactory.DEFAULT_XDS_TRANSPORT_FACTORY.createForTest(channel), + Context.ROOT, NODE, syncContext, fakeClock.getScheduledExecutorService(), backoffPolicyProvider, fakeClock.getStopwatchSupplier()); syncContext.execute(new Runnable() { diff --git a/xds/src/test/java/io/grpc/xds/XdsClientFederationTest.java b/xds/src/test/java/io/grpc/xds/XdsClientFederationTest.java index ea31922608e..a03b740f231 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientFederationTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientFederationTest.java @@ -147,7 +147,7 @@ public void lrsClientsStartedForLocalityStats() throws InterruptedException, Exe xdsClient.addClusterLocalityStats(entry.getKey(), "clusterName", "edsServiceName", Locality.create("", "", "")); waitForSyncContext(xdsClient); - assertThat(entry.getValue().lrsStream).isNotNull(); + assertThat(entry.getValue().lrsStreamStatus).isNotNull(); } } @@ -176,7 +176,7 @@ public void lrsClientsStartedForDropStats() throws InterruptedException, Executi for (Entry entry : xdsClient.getServerLrsClientMap().entrySet()) { xdsClient.addClusterDropStats(entry.getKey(), "clusterName", "edsServiceName"); waitForSyncContext(xdsClient); - assertThat(entry.getValue().lrsStream).isNotNull(); + assertThat(entry.getValue().lrsStreamStatus).isNotNull(); } } diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java index 4c3e05d8f1b..8c727db5eda 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java @@ -18,7 +18,7 @@ import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertWithMessage; -import static io.grpc.xds.XdsClientImpl.XdsChannelFactory.DEFAULT_XDS_CHANNEL_FACTORY; +import static io.grpc.xds.GrpcXdsTransportFactory.DEFAULT_XDS_TRANSPORT_FACTORY; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; @@ -85,6 +85,7 @@ import io.grpc.xds.EnvoyServerProtoData.FilterChain; import io.grpc.xds.EnvoyServerProtoData.SuccessRateEjection; import io.grpc.xds.FaultConfig.FractionalPercent.DenominatorType; +import io.grpc.xds.GrpcXdsTransportFactory.GrpcXdsTransport; import io.grpc.xds.LoadStatsManager2.ClusterDropStats; import io.grpc.xds.XdsClient.ResourceMetadata; import io.grpc.xds.XdsClient.ResourceMetadata.ResourceMetadataStatus; @@ -92,7 +93,6 @@ import io.grpc.xds.XdsClient.ResourceUpdate; import io.grpc.xds.XdsClient.ResourceWatcher; import io.grpc.xds.XdsClientImpl.ResourceInvalidException; -import io.grpc.xds.XdsClientImpl.XdsChannelFactory; import io.grpc.xds.XdsClusterResource.CdsUpdate; import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType; import io.grpc.xds.XdsEndpointResource.EdsUpdate; @@ -322,25 +322,25 @@ public void setUp() throws IOException { .start()); channel = cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()); - XdsChannelFactory xdsChannelFactory = new XdsChannelFactory() { + XdsTransportFactory xdsTransportFactory = new XdsTransportFactory() { @Override - ManagedChannel create(ServerInfo serverInfo) { + public XdsTransport create(ServerInfo serverInfo) { if (serverInfo.target().equals(SERVER_URI)) { - return channel; + return new GrpcXdsTransport(channel); } if (serverInfo.target().equals(SERVER_URI_CUSTOME_AUTHORITY)) { if (channelForCustomAuthority == null) { channelForCustomAuthority = cleanupRule.register( InProcessChannelBuilder.forName(serverName).directExecutor().build()); } - return channelForCustomAuthority; + return new GrpcXdsTransport(channelForCustomAuthority); } if (serverInfo.target().equals(SERVER_URI_EMPTY_AUTHORITY)) { if (channelForEmptyAuthority == null) { channelForEmptyAuthority = cleanupRule.register( InProcessChannelBuilder.forName(serverName).directExecutor().build()); } - return channelForEmptyAuthority; + return new GrpcXdsTransport(channelForEmptyAuthority); } throw new IllegalArgumentException("Can not create channel for " + serverInfo); } @@ -368,7 +368,7 @@ ManagedChannel create(ServerInfo serverInfo) { .build(); xdsClient = new XdsClientImpl( - xdsChannelFactory, + xdsTransportFactory, bootstrapInfo, Context.ROOT, fakeClock.getScheduledExecutorService(), @@ -3743,7 +3743,7 @@ public void sendToNonexistentHost() throws Exception { private XdsClientImpl createXdsClient(String serverUri) { BootstrapInfo bootstrapInfo = buildBootStrap(serverUri); return new XdsClientImpl( - DEFAULT_XDS_CHANNEL_FACTORY, + DEFAULT_XDS_TRANSPORT_FACTORY, bootstrapInfo, Context.ROOT, fakeClock.getScheduledExecutorService(),