Skip to content

Commit

Permalink
change controlPlaneClient and loadReportClient
Browse files Browse the repository at this point in the history
  • Loading branch information
YifeiZhuang committed Jan 17, 2024
1 parent 005f8c0 commit a0ac6b3
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 268 deletions.
264 changes: 103 additions & 161 deletions xds/src/main/java/io/grpc/xds/ControlPlaneClient.java

Large diffs are not rendered by default.

16 changes: 14 additions & 2 deletions xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.CallOptions;
import io.grpc.ChannelCredentials;
import io.grpc.ClientCall;
Expand All @@ -30,14 +31,20 @@

final class GrpcXdsTransportFactory implements XdsTransportFactory {

static final XdsTransportFactory DEFAULT_XDS_TRANSPORT_FACTORY = new GrpcXdsTransportFactory();
static final GrpcXdsTransportFactory DEFAULT_XDS_TRANSPORT_FACTORY =
new GrpcXdsTransportFactory();

@Override
public XdsTransport create(Bootstrapper.ServerInfo serverInfo) {
return new GrpcXdsTransport(serverInfo);
}

private class GrpcXdsTransport implements XdsTransport {
public XdsTransport createForTest(ManagedChannel channel) {
return new GrpcXdsTransport(channel);
}

@VisibleForTesting
static class GrpcXdsTransport implements XdsTransport {

private final ManagedChannel channel;

Expand All @@ -49,6 +56,11 @@ public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo) {
.build();
}

@VisibleForTesting
public GrpcXdsTransport(ManagedChannel channel) {
this.channel = checkNotNull(channel, "channel");
}

@Override
public <ReqT, RespT> StreamingCall<ReqT, RespT> createStreamingCall(
String fullMethodName,
Expand Down
127 changes: 59 additions & 68 deletions xds/src/main/java/io/grpc/xds/LoadReportClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,21 @@
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
import io.grpc.Channel;
import io.grpc.Context;
import io.grpc.InternalLogId;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.Stats.ClusterStats;
import io.grpc.xds.Stats.DroppedRequests;
import io.grpc.xds.Stats.UpstreamLocalityStats;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsTransportFactory.EventHandler;
import io.grpc.xds.XdsTransportFactory.StreamingCall;
import io.grpc.xds.XdsTransportFactory.XdsTransport;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -55,7 +57,7 @@
final class LoadReportClient {
private final InternalLogId logId;
private final XdsLogger logger;
private final Channel channel;
private final XdsTransport xdsTransport;
private final Context context;
private final Node node;
private final SynchronizationContext syncContext;
Expand All @@ -64,27 +66,28 @@ final class LoadReportClient {
private final BackoffPolicy.Provider backoffPolicyProvider;
@VisibleForTesting
final LoadStatsManager2 loadStatsManager;

private boolean started;
@Nullable
private BackoffPolicy lrsRpcRetryPolicy;
@Nullable
private ScheduledHandle lrsRpcRetryTimer;
@Nullable
@VisibleForTesting
LrsStream lrsStream;
LrsStreamStatus lrsStreamStatus;
private static final MethodDescriptor<LoadStatsRequest, LoadStatsResponse> method =
LoadReportingServiceGrpc.getStreamLoadStatsMethod();

LoadReportClient(
LoadStatsManager2 loadStatsManager,
Channel channel,
XdsTransport xdsTransport,
Context context,
Node node,
SynchronizationContext syncContext,
ScheduledExecutorService scheduledExecutorService,
BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier) {
this.loadStatsManager = checkNotNull(loadStatsManager, "loadStatsManager");
this.channel = checkNotNull(channel, "xdsChannel");
this.xdsTransport = checkNotNull(xdsTransport, "xdsTransport");
this.context = checkNotNull(context, "context");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.timerService = checkNotNull(scheduledExecutorService, "timeService");
Expand Down Expand Up @@ -126,17 +129,17 @@ void stopLoadReporting() {
if (lrsRpcRetryTimer != null && lrsRpcRetryTimer.isPending()) {
lrsRpcRetryTimer.cancel();
}
if (lrsStream != null) {
lrsStream.close(Status.CANCELLED.withDescription("stop load reporting").asException());
if (lrsStreamStatus != null) {
lrsStreamStatus.close(Status.CANCELLED.withDescription("stop load reporting").asException());
}
// Do not shutdown channel as it is not owned by LrsClient.
}

@VisibleForTesting
static class LoadReportingTask implements Runnable {
private final LrsStream stream;
private final LrsStreamStatus stream;

LoadReportingTask(LrsStream stream) {
LoadReportingTask(LrsStreamStatus stream) {
this.stream = stream;
}

Expand All @@ -159,65 +162,65 @@ private void startLrsRpc() {
if (!started) {
return;
}
checkState(lrsStream == null, "previous lbStream has not been cleared yet");
lrsStream = new LrsStream();
checkState(lrsStreamStatus == null, "previous lbStream has not been cleared yet");
retryStopwatch.reset().start();
Context prevContext = context.attach();
try {
lrsStream.start();
lrsStreamStatus = new LrsStreamStatus();
} finally {
context.detach(prevContext);
}
}

private final class LrsStream {
private final class LrsStreamStatus {
boolean initialResponseReceived;
boolean closed;
long intervalNano = -1;
boolean reportAllClusters;
List<String> clusterNames; // clusters to report loads for, if not report all.
ScheduledHandle loadReportTimer;
StreamObserver<LoadStatsRequest> lrsRequestWriterV3;
private final StreamingCall<LoadStatsRequest, LoadStatsResponse> call;
private final EventHandler<LoadStatsResponse> eventHandler = new LrsEventHandler();

void start() {
StreamObserver<LoadStatsResponse> lrsResponseReaderV3 =
new StreamObserver<LoadStatsResponse>() {
@Override
public void onNext(final LoadStatsResponse response) {
syncContext.execute(new Runnable() {
@Override
public void run() {
logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response);
handleRpcResponse(response.getClustersList(), response.getSendAllClusters(),
Durations.toNanos(response.getLoadReportingInterval()));
}
});
}
LrsStreamStatus() {
this.call = xdsTransport.createStreamingCall(method.getFullMethodName(),
method.getRequestMarshaller(), method.getResponseMarshaller());
call.start(eventHandler);
logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request");
sendLoadStatsRequest(Collections.<ClusterStats>emptyList());
call.startRecvMessage();
}

@Override
public void onError(final Throwable t) {
syncContext.execute(new Runnable() {
@Override
public void run() {
handleRpcError(t);
}
});
}
private final class LrsEventHandler implements EventHandler<LoadStatsResponse> {
@Override
public void onReady() {}

@Override
public void onRecvMessage(LoadStatsResponse response) {
syncContext.execute(new Runnable() {
@Override
public void run() {
logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response);
handleRpcResponse(response.getClustersList(), response.getSendAllClusters(),
Durations.toNanos(response.getLoadReportingInterval()));
call.startRecvMessage();
}
});
}

@Override
public void onCompleted() {
syncContext.execute(new Runnable() {
@Override
public void run() {
handleRpcCompleted();
}
});
@Override
public void onStatusReceived(final Status status) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (status.isOk()) {
handleStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server"));
} else {
handleStreamClosed(status);
}
};
lrsRequestWriterV3 = LoadReportingServiceGrpc.newStub(channel).withWaitForReady()
.streamLoadStats(lrsResponseReaderV3);
logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request");
sendLoadStatsRequest(Collections.<ClusterStats>emptyList());
}
});
}
}

void sendLoadStatsRequest(List<ClusterStats> clusterStatsList) {
Expand All @@ -227,14 +230,10 @@ void sendLoadStatsRequest(List<ClusterStats> clusterStatsList) {
requestBuilder.addClusterStats(buildClusterStats(stats));
}
LoadStatsRequest request = requestBuilder.build();
lrsRequestWriterV3.onNext(request);
call.sendMessage(request);
logger.log(XdsLogLevel.DEBUG, "Sent LoadStatsRequest\n{0}", request);
}

void sendError(Exception error) {
lrsRequestWriterV3.onError(error);
}

void handleRpcResponse(List<String> clusters, boolean sendAllClusters,
long loadReportIntervalNano) {
if (closed) {
Expand All @@ -256,14 +255,6 @@ void handleRpcResponse(List<String> clusters, boolean sendAllClusters,
scheduleNextLoadReport();
}

void handleRpcError(Throwable t) {
handleStreamClosed(Status.fromThrowable(t));
}

void handleRpcCompleted() {
handleStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server"));
}

private void sendLoadReport() {
if (closed) {
return;
Expand Down Expand Up @@ -330,16 +321,16 @@ private void close(Exception error) {
}
closed = true;
cleanUp();
sendError(error);
call.sendError(error);
}

private void cleanUp() {
if (loadReportTimer != null && loadReportTimer.isPending()) {
loadReportTimer.cancel();
loadReportTimer = null;
}
if (lrsStream == this) {
lrsStream = null;
if (lrsStreamStatus == this) {
lrsStreamStatus = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.xds;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.GrpcXdsTransportFactory.DEFAULT_XDS_TRANSPORT_FACTORY;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.Context;
Expand All @@ -26,7 +27,6 @@
import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.TimeProvider;
import io.grpc.xds.Bootstrapper.BootstrapInfo;
import io.grpc.xds.XdsClientImpl.XdsChannelFactory;
import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
import io.grpc.xds.internal.security.TlsContextManagerImpl;
import java.util.Map;
Expand Down Expand Up @@ -124,7 +124,7 @@ public XdsClient getObject() {
if (refCount == 0) {
scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
xdsClient = new XdsClientImpl(
XdsChannelFactory.DEFAULT_XDS_CHANNEL_FACTORY,
DEFAULT_XDS_TRANSPORT_FACTORY,
bootstrapInfo,
context,
scheduler,
Expand Down
29 changes: 6 additions & 23 deletions xds/src/main/java/io/grpc/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,9 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Any;
import io.grpc.ChannelCredentials;
import io.grpc.Context;
import io.grpc.Grpc;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancerRegistry;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
Expand Down Expand Up @@ -100,7 +97,7 @@ public void uncaughtException(Thread t, Throwable e) {
private final Map<String, XdsResourceType<?>> subscribedResourceTypeUrls = new HashMap<>();
private final Map<ServerInfo, LoadStatsManager2> loadStatsManagerMap = new HashMap<>();
private final Map<ServerInfo, LoadReportClient> serverLrsClientMap = new HashMap<>();
private final XdsChannelFactory xdsChannelFactory;
private final XdsTransportFactory xdsTransportFactory;
private final Bootstrapper.BootstrapInfo bootstrapInfo;
private final Context context;
private final ScheduledExecutorService timeService;
Expand All @@ -113,15 +110,15 @@ public void uncaughtException(Thread t, Throwable e) {
private volatile boolean isShutdown;

XdsClientImpl(
XdsChannelFactory xdsChannelFactory,
XdsTransportFactory xdsTransportFactory,
Bootstrapper.BootstrapInfo bootstrapInfo,
Context context,
ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier,
TimeProvider timeProvider,
TlsContextManager tlsContextManager) {
this.xdsChannelFactory = xdsChannelFactory;
this.xdsTransportFactory = xdsTransportFactory;
this.bootstrapInfo = bootstrapInfo;
this.context = context;
this.timeService = timeService;
Expand All @@ -142,8 +139,9 @@ private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) {
if (serverChannelMap.containsKey(serverInfo)) {
return;
}
XdsTransportFactory.XdsTransport xdsTransport = xdsTransportFactory.create(serverInfo);
ControlPlaneClient xdsChannel = new ControlPlaneClient(
xdsChannelFactory,
xdsTransport,
serverInfo,
bootstrapInfo.node(),
this,
Expand All @@ -157,7 +155,7 @@ private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) {
LoadStatsManager2 loadStatsManager = new LoadStatsManager2(stopwatchSupplier);
loadStatsManagerMap.put(serverInfo, loadStatsManager);
LoadReportClient lrsClient = new LoadReportClient(
loadStatsManager, xdsChannel.channel(), context, bootstrapInfo.node(), syncContext,
loadStatsManager, xdsTransport, context, bootstrapInfo.node(), syncContext,
timeService, backoffPolicyProvider, stopwatchSupplier);
serverChannelMap.put(serverInfo, xdsChannel);
serverLrsClientMap.put(serverInfo, lrsClient);
Expand Down Expand Up @@ -759,19 +757,4 @@ static final class ResourceInvalidException extends Exception {
super(cause != null ? message + ": " + cause.getMessage() : message, cause, false, false);
}
}

abstract static class XdsChannelFactory {
static final XdsChannelFactory DEFAULT_XDS_CHANNEL_FACTORY = new XdsChannelFactory() {
@Override
ManagedChannel create(ServerInfo serverInfo) {
String target = serverInfo.target();
ChannelCredentials channelCredentials = serverInfo.channelCredentials();
return Grpc.newChannelBuilder(target, channelCredentials)
.keepAliveTime(5, TimeUnit.MINUTES)
.build();
}
};

abstract ManagedChannel create(ServerInfo serverInfo);
}
}
Loading

0 comments on commit a0ac6b3

Please sign in to comment.