Skip to content

Commit d034a56

Browse files
authored
Xds client split (#11484)
1 parent ee3ffef commit d034a56

15 files changed

+284
-93
lines changed

xds/src/main/java/io/grpc/xds/CsdsService.java

+53-27
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus;
4040
import io.grpc.xds.client.XdsClient.ResourceMetadata.UpdateFailureState;
4141
import io.grpc.xds.client.XdsResourceType;
42+
import java.util.ArrayList;
43+
import java.util.List;
4244
import java.util.Map;
4345
import java.util.concurrent.ExecutionException;
4446
import java.util.concurrent.TimeUnit;
@@ -117,53 +119,77 @@ public void onCompleted() {
117119

118120
private boolean handleRequest(
119121
ClientStatusRequest request, StreamObserver<ClientStatusResponse> responseObserver) {
120-
StatusException error;
121-
try {
122-
responseObserver.onNext(getConfigDumpForRequest(request));
123-
return true;
124-
} catch (StatusException e) {
125-
error = e;
126-
} catch (InterruptedException e) {
127-
Thread.currentThread().interrupt();
128-
logger.log(Level.FINE, "Server interrupted while building CSDS config dump", e);
129-
error = Status.ABORTED.withDescription("Thread interrupted").withCause(e).asException();
130-
} catch (RuntimeException e) {
131-
logger.log(Level.WARNING, "Unexpected error while building CSDS config dump", e);
132-
error =
133-
Status.INTERNAL.withDescription("Unexpected internal error").withCause(e).asException();
134-
}
135-
responseObserver.onError(error);
136-
return false;
137-
}
122+
StatusException error = null;
138123

139-
private ClientStatusResponse getConfigDumpForRequest(ClientStatusRequest request)
140-
throws StatusException, InterruptedException {
141124
if (request.getNodeMatchersCount() > 0) {
142-
throw new StatusException(
125+
error = new StatusException(
143126
Status.INVALID_ARGUMENT.withDescription("node_matchers not supported"));
127+
} else {
128+
List<String> targets = xdsClientPoolFactory.getTargets();
129+
List<ClientConfig> clientConfigs = new ArrayList<>(targets.size());
130+
131+
for (int i = 0; i < targets.size() && error == null; i++) {
132+
try {
133+
ClientConfig clientConfig = getConfigForRequest(targets.get(i));
134+
if (clientConfig != null) {
135+
clientConfigs.add(clientConfig);
136+
}
137+
} catch (InterruptedException e) {
138+
Thread.currentThread().interrupt();
139+
logger.log(Level.FINE, "Server interrupted while building CSDS config dump", e);
140+
error = Status.ABORTED.withDescription("Thread interrupted").withCause(e).asException();
141+
} catch (RuntimeException e) {
142+
logger.log(Level.WARNING, "Unexpected error while building CSDS config dump", e);
143+
error = Status.INTERNAL.withDescription("Unexpected internal error").withCause(e)
144+
.asException();
145+
}
146+
}
147+
148+
try {
149+
responseObserver.onNext(getStatusResponse(clientConfigs));
150+
} catch (RuntimeException e) {
151+
logger.log(Level.WARNING, "Unexpected error while processing CSDS config dump", e);
152+
error = Status.INTERNAL.withDescription("Unexpected internal error").withCause(e)
153+
.asException();
154+
}
144155
}
145156

146-
ObjectPool<XdsClient> xdsClientPool = xdsClientPoolFactory.get();
157+
if (error == null) {
158+
return true; // All clients reported without error
159+
}
160+
responseObserver.onError(error);
161+
return false;
162+
}
163+
164+
private ClientConfig getConfigForRequest(String target) throws InterruptedException {
165+
ObjectPool<XdsClient> xdsClientPool = xdsClientPoolFactory.get(target);
147166
if (xdsClientPool == null) {
148-
return ClientStatusResponse.getDefaultInstance();
167+
return null;
149168
}
150169

151170
XdsClient xdsClient = null;
152171
try {
153172
xdsClient = xdsClientPool.getObject();
154-
return ClientStatusResponse.newBuilder()
155-
.addConfig(getClientConfigForXdsClient(xdsClient))
156-
.build();
173+
return getClientConfigForXdsClient(xdsClient, target);
157174
} finally {
158175
if (xdsClient != null) {
159176
xdsClientPool.returnObject(xdsClient);
160177
}
161178
}
162179
}
163180

181+
private ClientStatusResponse getStatusResponse(List<ClientConfig> clientConfigs) {
182+
if (clientConfigs.isEmpty()) {
183+
return ClientStatusResponse.getDefaultInstance();
184+
}
185+
return ClientStatusResponse.newBuilder().addAllConfig(clientConfigs).build();
186+
}
187+
164188
@VisibleForTesting
165-
static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient) throws InterruptedException {
189+
static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient, String target)
190+
throws InterruptedException {
166191
ClientConfig.Builder builder = ClientConfig.newBuilder()
192+
.setClientScope(target)
167193
.setNode(xdsClient.getBootstrapInfo().node().toEnvoyProtoNode());
168194

169195
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType =

xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,6 @@ public static void setDefaultProviderBootstrapOverride(Map<String, ?> bootstrap)
3636

3737
public static ObjectPool<XdsClient> getOrCreate(String target)
3838
throws XdsInitializationException {
39-
return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate();
39+
return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate(target);
4040
}
4141
}

xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java

+28-9
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static io.grpc.xds.GrpcXdsTransportFactory.DEFAULT_XDS_TRANSPORT_FACTORY;
2121

2222
import com.google.common.annotations.VisibleForTesting;
23+
import com.google.common.collect.ImmutableList;
2324
import io.grpc.internal.ExponentialBackoffPolicy;
2425
import io.grpc.internal.GrpcUtil;
2526
import io.grpc.internal.ObjectPool;
@@ -32,6 +33,7 @@
3233
import io.grpc.xds.client.XdsInitializationException;
3334
import io.grpc.xds.internal.security.TlsContextManagerImpl;
3435
import java.util.Map;
36+
import java.util.concurrent.ConcurrentHashMap;
3537
import java.util.concurrent.ScheduledExecutorService;
3638
import java.util.concurrent.atomic.AtomicReference;
3739
import java.util.logging.Level;
@@ -53,7 +55,7 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
5355
private final Bootstrapper bootstrapper;
5456
private final Object lock = new Object();
5557
private final AtomicReference<Map<String, ?>> bootstrapOverride = new AtomicReference<>();
56-
private volatile ObjectPool<XdsClient> xdsClientPool;
58+
private final Map<String, ObjectPool<XdsClient>> targetToXdsClientMap = new ConcurrentHashMap<>();
5759

5860
SharedXdsClientPoolProvider() {
5961
this(new GrpcBootstrapperImpl());
@@ -75,16 +77,16 @@ public void setBootstrapOverride(Map<String, ?> bootstrap) {
7577

7678
@Override
7779
@Nullable
78-
public ObjectPool<XdsClient> get() {
79-
return xdsClientPool;
80+
public ObjectPool<XdsClient> get(String target) {
81+
return targetToXdsClientMap.get(target);
8082
}
8183

8284
@Override
83-
public ObjectPool<XdsClient> getOrCreate() throws XdsInitializationException {
84-
ObjectPool<XdsClient> ref = xdsClientPool;
85+
public ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitializationException {
86+
ObjectPool<XdsClient> ref = targetToXdsClientMap.get(target);
8587
if (ref == null) {
8688
synchronized (lock) {
87-
ref = xdsClientPool;
89+
ref = targetToXdsClientMap.get(target);
8890
if (ref == null) {
8991
BootstrapInfo bootstrapInfo;
9092
Map<String, ?> rawBootstrap = bootstrapOverride.get();
@@ -96,21 +98,32 @@ public ObjectPool<XdsClient> getOrCreate() throws XdsInitializationException {
9698
if (bootstrapInfo.servers().isEmpty()) {
9799
throw new XdsInitializationException("No xDS server provided");
98100
}
99-
ref = xdsClientPool = new RefCountedXdsClientObjectPool(bootstrapInfo);
101+
ref = new RefCountedXdsClientObjectPool(bootstrapInfo, target);
102+
targetToXdsClientMap.put(target, ref);
100103
}
101104
}
102105
}
103106
return ref;
104107
}
105108

109+
@Override
110+
public ImmutableList<String> getTargets() {
111+
return ImmutableList.copyOf(targetToXdsClientMap.keySet());
112+
}
113+
114+
106115
private static class SharedXdsClientPoolProviderHolder {
107116
private static final SharedXdsClientPoolProvider instance = new SharedXdsClientPoolProvider();
108117
}
109118

110119
@ThreadSafe
111120
@VisibleForTesting
112121
static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
122+
123+
private static final ExponentialBackoffPolicy.Provider BACKOFF_POLICY_PROVIDER =
124+
new ExponentialBackoffPolicy.Provider();
113125
private final BootstrapInfo bootstrapInfo;
126+
private final String target; // The target associated with the xDS client.
114127
private final Object lock = new Object();
115128
@GuardedBy("lock")
116129
private ScheduledExecutorService scheduler;
@@ -120,8 +133,9 @@ static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
120133
private int refCount;
121134

122135
@VisibleForTesting
123-
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo) {
136+
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, String target) {
124137
this.bootstrapInfo = checkNotNull(bootstrapInfo);
138+
this.target = target;
125139
}
126140

127141
@Override
@@ -136,7 +150,7 @@ public XdsClient getObject() {
136150
DEFAULT_XDS_TRANSPORT_FACTORY,
137151
bootstrapInfo,
138152
scheduler,
139-
new ExponentialBackoffPolicy.Provider(),
153+
BACKOFF_POLICY_PROVIDER,
140154
GrpcUtil.STOPWATCH_SUPPLIER,
141155
TimeProvider.SYSTEM_TIME_PROVIDER,
142156
MessagePrinter.INSTANCE,
@@ -167,5 +181,10 @@ XdsClient getXdsClientForTest() {
167181
return xdsClient;
168182
}
169183
}
184+
185+
public String getTarget() {
186+
return target;
187+
}
170188
}
189+
171190
}

xds/src/main/java/io/grpc/xds/XdsClientPoolFactory.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,17 @@
1919
import io.grpc.internal.ObjectPool;
2020
import io.grpc.xds.client.XdsClient;
2121
import io.grpc.xds.client.XdsInitializationException;
22+
import java.util.List;
2223
import java.util.Map;
2324
import javax.annotation.Nullable;
2425

2526
interface XdsClientPoolFactory {
2627
void setBootstrapOverride(Map<String, ?> bootstrap);
2728

2829
@Nullable
29-
ObjectPool<XdsClient> get();
30+
ObjectPool<XdsClient> get(String target);
3031

31-
ObjectPool<XdsClient> getOrCreate() throws XdsInitializationException;
32+
ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitializationException;
33+
34+
List<String> getTargets();
3235
}

xds/src/main/java/io/grpc/xds/XdsNameResolver.java

+10-7
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import io.grpc.xds.client.XdsClient.ResourceWatcher;
6767
import io.grpc.xds.client.XdsLogger;
6868
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
69+
import java.net.URI;
6970
import java.util.ArrayList;
7071
import java.util.Collections;
7172
import java.util.HashMap;
@@ -104,6 +105,7 @@ final class XdsNameResolver extends NameResolver {
104105
private final XdsLogger logger;
105106
@Nullable
106107
private final String targetAuthority;
108+
private final String target;
107109
private final String serviceAuthority;
108110
// Encoded version of the service authority as per
109111
// https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.
@@ -133,23 +135,24 @@ final class XdsNameResolver extends NameResolver {
133135
private boolean receivedConfig;
134136

135137
XdsNameResolver(
136-
@Nullable String targetAuthority, String name, @Nullable String overrideAuthority,
138+
URI targetUri, String name, @Nullable String overrideAuthority,
137139
ServiceConfigParser serviceConfigParser,
138140
SynchronizationContext syncContext, ScheduledExecutorService scheduler,
139141
@Nullable Map<String, ?> bootstrapOverride) {
140-
this(targetAuthority, name, overrideAuthority, serviceConfigParser, syncContext, scheduler,
141-
SharedXdsClientPoolProvider.getDefaultProvider(), ThreadSafeRandomImpl.instance,
142-
FilterRegistry.getDefaultRegistry(), bootstrapOverride);
142+
this(targetUri, targetUri.getAuthority(), name, overrideAuthority, serviceConfigParser,
143+
syncContext, scheduler, SharedXdsClientPoolProvider.getDefaultProvider(),
144+
ThreadSafeRandomImpl.instance, FilterRegistry.getDefaultRegistry(), bootstrapOverride);
143145
}
144146

145147
@VisibleForTesting
146148
XdsNameResolver(
147-
@Nullable String targetAuthority, String name, @Nullable String overrideAuthority,
148-
ServiceConfigParser serviceConfigParser,
149+
URI targetUri, @Nullable String targetAuthority, String name,
150+
@Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser,
149151
SynchronizationContext syncContext, ScheduledExecutorService scheduler,
150152
XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random,
151153
FilterRegistry filterRegistry, @Nullable Map<String, ?> bootstrapOverride) {
152154
this.targetAuthority = targetAuthority;
155+
target = targetUri.toString();
153156

154157
// The name might have multiple slashes so encode it before verifying.
155158
serviceAuthority = checkNotNull(name, "name");
@@ -180,7 +183,7 @@ public String getServiceAuthority() {
180183
public void start(Listener2 listener) {
181184
this.listener = checkNotNull(listener, "listener");
182185
try {
183-
xdsClientPool = xdsClientPoolFactory.getOrCreate();
186+
xdsClientPool = xdsClientPoolFactory.getOrCreate(target);
184187
} catch (Exception e) {
185188
listener.onError(
186189
Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e));

xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public XdsNameResolver newNameResolver(URI targetUri, Args args) {
7878
targetUri);
7979
String name = targetPath.substring(1);
8080
return new XdsNameResolver(
81-
targetUri.getAuthority(), name, args.getOverrideAuthority(),
81+
targetUri, name, args.getOverrideAuthority(),
8282
args.getServiceConfigParser(), args.getSynchronizationContext(),
8383
args.getScheduledExecutorService(),
8484
bootstrapOverride);

xds/src/main/java/io/grpc/xds/XdsServerWrapper.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ public void run() {
171171

172172
private void internalStart() {
173173
try {
174-
xdsClientPool = xdsClientPoolFactory.getOrCreate();
174+
xdsClientPool = xdsClientPoolFactory.getOrCreate("");
175175
} catch (Exception e) {
176176
StatusException statusException = Status.UNAVAILABLE.withDescription(
177177
"Failed to initialize xDS").withCause(e).asException();

0 commit comments

Comments
 (0)