Skip to content

Commit 73a471a

Browse files
committed
Revert "xds: refactor xds client to make it resource agnostic (grpc#9444)"
This reverts commit e1ad984.
1 parent 2289956 commit 73a471a

23 files changed

+3190
-3409
lines changed

xds/src/main/java/io/grpc/xds/AbstractXdsClient.java

Lines changed: 125 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,6 @@
1919
import static com.google.common.base.Preconditions.checkArgument;
2020
import static com.google.common.base.Preconditions.checkNotNull;
2121
import static com.google.common.base.Preconditions.checkState;
22-
import static io.grpc.xds.XdsClusterResource.ADS_TYPE_URL_CDS;
23-
import static io.grpc.xds.XdsClusterResource.ADS_TYPE_URL_CDS_V2;
24-
import static io.grpc.xds.XdsEndpointResource.ADS_TYPE_URL_EDS;
25-
import static io.grpc.xds.XdsEndpointResource.ADS_TYPE_URL_EDS_V2;
26-
import static io.grpc.xds.XdsListenerResource.ADS_TYPE_URL_LDS;
27-
import static io.grpc.xds.XdsListenerResource.ADS_TYPE_URL_LDS_V2;
28-
import static io.grpc.xds.XdsRouteConfigureResource.ADS_TYPE_URL_RDS;
29-
import static io.grpc.xds.XdsRouteConfigureResource.ADS_TYPE_URL_RDS_V2;
3022

3123
import com.google.common.annotations.VisibleForTesting;
3224
import com.google.common.base.Stopwatch;
@@ -49,14 +41,11 @@
4941
import io.grpc.xds.ClientXdsClient.XdsChannelFactory;
5042
import io.grpc.xds.EnvoyProtoData.Node;
5143
import io.grpc.xds.XdsClient.ResourceStore;
52-
import io.grpc.xds.XdsClient.ResourceUpdate;
5344
import io.grpc.xds.XdsClient.XdsResponseHandler;
5445
import io.grpc.xds.XdsLogger.XdsLogLevel;
5546
import java.util.Collection;
5647
import java.util.Collections;
57-
import java.util.HashMap;
5848
import java.util.List;
59-
import java.util.Map;
6049
import java.util.concurrent.ScheduledExecutorService;
6150
import java.util.concurrent.TimeUnit;
6251
import javax.annotation.Nullable;
@@ -66,6 +55,23 @@
6655
* the xDS RPC stream.
6756
*/
6857
final class AbstractXdsClient {
58+
59+
private static final String ADS_TYPE_URL_LDS_V2 = "type.googleapis.com/envoy.api.v2.Listener";
60+
private static final String ADS_TYPE_URL_LDS =
61+
"type.googleapis.com/envoy.config.listener.v3.Listener";
62+
private static final String ADS_TYPE_URL_RDS_V2 =
63+
"type.googleapis.com/envoy.api.v2.RouteConfiguration";
64+
private static final String ADS_TYPE_URL_RDS =
65+
"type.googleapis.com/envoy.config.route.v3.RouteConfiguration";
66+
@VisibleForTesting
67+
static final String ADS_TYPE_URL_CDS_V2 = "type.googleapis.com/envoy.api.v2.Cluster";
68+
private static final String ADS_TYPE_URL_CDS =
69+
"type.googleapis.com/envoy.config.cluster.v3.Cluster";
70+
private static final String ADS_TYPE_URL_EDS_V2 =
71+
"type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
72+
private static final String ADS_TYPE_URL_EDS =
73+
"type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
74+
6975
private final SynchronizationContext syncContext;
7076
private final InternalLogId logId;
7177
private final XdsLogger logger;
@@ -82,7 +88,10 @@ final class AbstractXdsClient {
8288
// Last successfully applied version_info for each resource type. Starts with empty string.
8389
// A version_info is used to update management server with client's most recent knowledge of
8490
// resources.
85-
private final Map<ResourceType, String> versions = new HashMap<>();
91+
private String ldsVersion = "";
92+
private String rdsVersion = "";
93+
private String cdsVersion = "";
94+
private String edsVersion = "";
8695

8796
private boolean shutdown;
8897
@Nullable
@@ -153,17 +162,16 @@ public String toString() {
153162
* Updates the resource subscription for the given resource type.
154163
*/
155164
// Must be synchronized.
156-
void adjustResourceSubscription(XdsResourceType<?> resourceType) {
165+
void adjustResourceSubscription(ResourceType type) {
157166
if (isInBackoff()) {
158167
return;
159168
}
160169
if (adsStream == null) {
161170
startRpcStream();
162171
}
163-
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo,
164-
resourceType.typeName());
172+
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
165173
if (resources != null) {
166-
adsStream.sendDiscoveryRequest(resourceType, resources);
174+
adsStream.sendDiscoveryRequest(type, resources);
167175
}
168176
}
169177

@@ -172,9 +180,24 @@ void adjustResourceSubscription(XdsResourceType<?> resourceType) {
172180
* and sends an ACK request to the management server.
173181
*/
174182
// Must be synchronized.
175-
void ackResponse(XdsResourceType<?> xdsResourceType, String versionInfo, String nonce) {
176-
ResourceType type = xdsResourceType.typeName();
177-
versions.put(type, versionInfo);
183+
void ackResponse(ResourceType type, String versionInfo, String nonce) {
184+
switch (type) {
185+
case LDS:
186+
ldsVersion = versionInfo;
187+
break;
188+
case RDS:
189+
rdsVersion = versionInfo;
190+
break;
191+
case CDS:
192+
cdsVersion = versionInfo;
193+
break;
194+
case EDS:
195+
edsVersion = versionInfo;
196+
break;
197+
case UNKNOWN:
198+
default:
199+
throw new AssertionError("Unknown resource type: " + type);
200+
}
178201
logger.log(XdsLogLevel.INFO, "Sending ACK for {0} update, nonce: {1}, current version: {2}",
179202
type, nonce, versionInfo);
180203
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
@@ -189,9 +212,8 @@ void ackResponse(XdsResourceType<?> xdsResourceType, String versionInfo, String
189212
* accepted version) to the management server.
190213
*/
191214
// Must be synchronized.
192-
void nackResponse(XdsResourceType<?> xdsResourceType, String nonce, String errorDetail) {
193-
ResourceType type = xdsResourceType.typeName();
194-
String versionInfo = versions.getOrDefault(type, "");
215+
void nackResponse(ResourceType type, String nonce, String errorDetail) {
216+
String versionInfo = getCurrentVersion(type);
195217
logger.log(XdsLogLevel.INFO, "Sending NACK for {0} update, nonce: {1}, current version: {2}",
196218
type, nonce, versionInfo);
197219
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
@@ -231,6 +253,30 @@ private void startRpcStream() {
231253
stopwatch.reset().start();
232254
}
233255

256+
/** Returns the latest accepted version of the given resource type. */
257+
// Must be synchronized.
258+
String getCurrentVersion(ResourceType type) {
259+
String version;
260+
switch (type) {
261+
case LDS:
262+
version = ldsVersion;
263+
break;
264+
case RDS:
265+
version = rdsVersion;
266+
break;
267+
case CDS:
268+
version = cdsVersion;
269+
break;
270+
case EDS:
271+
version = edsVersion;
272+
break;
273+
case UNKNOWN:
274+
default:
275+
throw new AssertionError("Unknown resource type: " + type);
276+
}
277+
return version;
278+
}
279+
234280
@VisibleForTesting
235281
final class RpcRetryTask implements Runnable {
236282
@Override
@@ -245,14 +291,13 @@ public void run() {
245291
}
246292
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
247293
if (resources != null) {
248-
adsStream.sendDiscoveryRequest(resourceStore.getXdsResourceType(type), resources);
294+
adsStream.sendDiscoveryRequest(type, resources);
249295
}
250296
}
251297
xdsResponseHandler.handleStreamRestarted(serverInfo);
252298
}
253299
}
254300

255-
// TODO(zivy) : remove and replace with XdsResourceType
256301
enum ResourceType {
257302
UNKNOWN, LDS, RDS, CDS, EDS;
258303

@@ -316,13 +361,17 @@ static ResourceType fromTypeUrl(String typeUrl) {
316361
private abstract class AbstractAdsStream {
317362
private boolean responseReceived;
318363
private boolean closed;
364+
319365
// Response nonce for the most recently received discovery responses of each resource type.
320366
// Client initiated requests start response nonce with empty string.
321-
// Nonce in each response is echoed back in the following ACK/NACK request. It is
322-
// used for management server to identify which response the client is ACKing/NACking.
323-
// To avoid confusion, client-initiated requests will always use the nonce in
324-
// most recently received responses of each resource type.
325-
private final Map<ResourceType, String> respNonces = new HashMap<>();
367+
// A nonce is used to indicate the specific DiscoveryResponse each DiscoveryRequest
368+
// corresponds to.
369+
// A nonce becomes stale following a newer nonce being presented to the client in a
370+
// DiscoveryResponse.
371+
private String ldsRespNonce = "";
372+
private String rdsRespNonce = "";
373+
private String cdsRespNonce = "";
374+
private String edsRespNonce = "";
326375

327376
abstract void start();
328377

@@ -332,20 +381,35 @@ private abstract class AbstractAdsStream {
332381
* Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
333382
* {@code errorDetail}. Used for reacting to a specific discovery response. For
334383
* client-initiated discovery requests, use {@link
335-
* #sendDiscoveryRequest(XdsResourceType, Collection)}.
384+
* #sendDiscoveryRequest(ResourceType, Collection)}.
336385
*/
337-
abstract void sendDiscoveryRequest(ResourceType type, String version,
386+
abstract void sendDiscoveryRequest(ResourceType type, String versionInfo,
338387
Collection<String> resources, String nonce, @Nullable String errorDetail);
339388

340389
/**
341390
* Sends a client-initiated discovery request.
342391
*/
343-
final void sendDiscoveryRequest(XdsResourceType<? extends ResourceUpdate> xdsResourceType,
344-
Collection<String> resources) {
345-
ResourceType type = xdsResourceType.typeName();
392+
final void sendDiscoveryRequest(ResourceType type, Collection<String> resources) {
393+
String nonce;
394+
switch (type) {
395+
case LDS:
396+
nonce = ldsRespNonce;
397+
break;
398+
case RDS:
399+
nonce = rdsRespNonce;
400+
break;
401+
case CDS:
402+
nonce = cdsRespNonce;
403+
break;
404+
case EDS:
405+
nonce = edsRespNonce;
406+
break;
407+
case UNKNOWN:
408+
default:
409+
throw new AssertionError("Unknown resource type: " + type);
410+
}
346411
logger.log(XdsLogLevel.INFO, "Sending {0} request for resources: {1}", type, resources);
347-
sendDiscoveryRequest(type, versions.getOrDefault(type, ""), resources,
348-
respNonces.getOrDefault(type, ""), null);
412+
sendDiscoveryRequest(type, getCurrentVersion(type), resources, nonce, null);
349413
}
350414

351415
final void handleRpcResponse(
@@ -354,8 +418,31 @@ final void handleRpcResponse(
354418
return;
355419
}
356420
responseReceived = true;
357-
respNonces.put(type, nonce);
358-
xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce);
421+
// Nonce in each response is echoed back in the following ACK/NACK request. It is
422+
// used for management server to identify which response the client is ACKing/NACking.
423+
// To avoid confusion, client-initiated requests will always use the nonce in
424+
// most recently received responses of each resource type.
425+
switch (type) {
426+
case LDS:
427+
ldsRespNonce = nonce;
428+
xdsResponseHandler.handleLdsResponse(serverInfo, versionInfo, resources, nonce);
429+
break;
430+
case RDS:
431+
rdsRespNonce = nonce;
432+
xdsResponseHandler.handleRdsResponse(serverInfo, versionInfo, resources, nonce);
433+
break;
434+
case CDS:
435+
cdsRespNonce = nonce;
436+
xdsResponseHandler.handleCdsResponse(serverInfo, versionInfo, resources, nonce);
437+
break;
438+
case EDS:
439+
edsRespNonce = nonce;
440+
xdsResponseHandler.handleEdsResponse(serverInfo, versionInfo, resources, nonce);
441+
break;
442+
case UNKNOWN:
443+
default:
444+
logger.log(XdsLogLevel.WARNING, "Ignore an unknown type of DiscoveryResponse");
445+
}
359446
}
360447

361448
final void handleRpcError(Throwable t) {

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@
3535
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
3636
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
3737
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
38-
import io.grpc.xds.XdsClient.ResourceWatcher;
39-
import io.grpc.xds.XdsClusterResource.CdsUpdate;
40-
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
38+
import io.grpc.xds.XdsClient.CdsResourceWatcher;
39+
import io.grpc.xds.XdsClient.CdsUpdate;
40+
import io.grpc.xds.XdsClient.CdsUpdate.ClusterType;
4141
import io.grpc.xds.XdsLogger.XdsLogLevel;
4242
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
4343
import java.util.ArrayDeque;
@@ -221,7 +221,7 @@ private void handleClusterDiscoveryError(Status error) {
221221
}
222222
}
223223

224-
private final class ClusterState implements ResourceWatcher<CdsUpdate> {
224+
private final class ClusterState implements CdsResourceWatcher {
225225
private final String name;
226226
@Nullable
227227
private Map<String, ClusterState> childClusterStates;
@@ -237,12 +237,12 @@ private ClusterState(String name) {
237237
}
238238

239239
private void start() {
240-
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this);
240+
xdsClient.watchCdsResource(name, this);
241241
}
242242

243243
void shutdown() {
244244
shutdown = true;
245-
xdsClient.cancelXdsResourceWatch(XdsClusterResource.getInstance(), name, this);
245+
xdsClient.cancelCdsResourceWatch(name, this);
246246
if (childClusterStates != null) { // recursively shut down all descendants
247247
for (ClusterState state : childClusterStates.values()) {
248248
state.shutdown();
@@ -300,7 +300,6 @@ public void run() {
300300
if (shutdown) {
301301
return;
302302
}
303-
304303
logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
305304
discovered = true;
306305
result = update;

0 commit comments

Comments
 (0)