Skip to content

Commit 1219706

Browse files
authored
xds: xDS-based HTTP CONNECT configuration (#11861)
1 parent c340f4a commit 1219706

16 files changed

+665
-220
lines changed

xds/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ java_proto_library(
8585
"@envoy_api//envoy/extensions/load_balancing_policies/ring_hash/v3:pkg",
8686
"@envoy_api//envoy/extensions/load_balancing_policies/round_robin/v3:pkg",
8787
"@envoy_api//envoy/extensions/load_balancing_policies/wrr_locality/v3:pkg",
88+
"@envoy_api//envoy/extensions/transport_sockets/http_11_proxy/v3:pkg",
8889
"@envoy_api//envoy/extensions/transport_sockets/tls/v3:pkg",
8990
"@envoy_api//envoy/service/discovery/v3:pkg",
9091
"@envoy_api//envoy/service/load_stats/v3:pkg",

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,9 @@ private void handleClusterDiscovered() {
243243
}
244244

245245
ClusterResolverConfig config = new ClusterResolverConfig(
246-
Collections.unmodifiableList(instances), configOrError.getConfig());
246+
Collections.unmodifiableList(instances),
247+
configOrError.getConfig(),
248+
root.result.isHttp11ProxyAvailable());
247249
if (childLb == null) {
248250
childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
249251
}

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.protobuf.Struct;
2626
import io.grpc.Attributes;
2727
import io.grpc.EquivalentAddressGroup;
28+
import io.grpc.HttpConnectProxiedSocketAddress;
2829
import io.grpc.InternalLogId;
2930
import io.grpc.LoadBalancer;
3031
import io.grpc.LoadBalancerProvider;
@@ -59,6 +60,8 @@
5960
import io.grpc.xds.client.XdsClient.ResourceWatcher;
6061
import io.grpc.xds.client.XdsLogger;
6162
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
63+
import java.net.InetSocketAddress;
64+
import java.net.SocketAddress;
6265
import java.net.URI;
6366
import java.net.URISyntaxException;
6467
import java.util.ArrayList;
@@ -430,8 +433,18 @@ public void run() {
430433
.set(XdsAttributes.ATTR_SERVER_WEIGHT, weight)
431434
.set(XdsAttributes.ATTR_ADDRESS_NAME, endpoint.hostname())
432435
.build();
433-
EquivalentAddressGroup eag = new EquivalentAddressGroup(
434-
endpoint.eag().getAddresses(), attr);
436+
437+
EquivalentAddressGroup eag;
438+
if (config.isHttp11ProxyAvailable()) {
439+
List<SocketAddress> rewrittenAddresses = new ArrayList<>();
440+
for (SocketAddress addr : endpoint.eag().getAddresses()) {
441+
rewrittenAddresses.add(rewriteAddress(
442+
addr, endpoint.endpointMetadata(), localityLbInfo.localityMetadata()));
443+
}
444+
eag = new EquivalentAddressGroup(rewrittenAddresses, attr);
445+
} else {
446+
eag = new EquivalentAddressGroup(endpoint.eag().getAddresses(), attr);
447+
}
435448
eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
436449
addresses.add(eag);
437450
}
@@ -469,6 +482,35 @@ public void run() {
469482
new EndpointsUpdated().run();
470483
}
471484

485+
private SocketAddress rewriteAddress(SocketAddress addr,
486+
ImmutableMap<String, Object> endpointMetadata,
487+
ImmutableMap<String, Object> localityMetadata) {
488+
if (!(addr instanceof InetSocketAddress)) {
489+
return addr;
490+
}
491+
492+
SocketAddress proxyAddress;
493+
try {
494+
proxyAddress = (SocketAddress) endpointMetadata.get(
495+
"envoy.http11_proxy_transport_socket.proxy_address");
496+
if (proxyAddress == null) {
497+
proxyAddress = (SocketAddress) localityMetadata.get(
498+
"envoy.http11_proxy_transport_socket.proxy_address");
499+
}
500+
} catch (ClassCastException e) {
501+
return addr;
502+
}
503+
504+
if (proxyAddress == null) {
505+
return addr;
506+
}
507+
508+
return HttpConnectProxiedSocketAddress.newBuilder()
509+
.setTargetAddress((InetSocketAddress) addr)
510+
.setProxyAddress(proxyAddress)
511+
.build();
512+
}
513+
472514
private List<String> generatePriorityNames(String name,
473515
Map<Locality, LocalityLbEndpoints> localityLbEndpoints) {
474516
TreeMap<Integer, List<Locality>> todo = new TreeMap<>();

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,17 @@ static final class ClusterResolverConfig {
7474
final List<DiscoveryMechanism> discoveryMechanisms;
7575
// GracefulSwitch configuration
7676
final Object lbConfig;
77+
private final boolean isHttp11ProxyAvailable;
7778

78-
ClusterResolverConfig(List<DiscoveryMechanism> discoveryMechanisms, Object lbConfig) {
79+
ClusterResolverConfig(List<DiscoveryMechanism> discoveryMechanisms, Object lbConfig,
80+
boolean isHttp11ProxyAvailable) {
7981
this.discoveryMechanisms = checkNotNull(discoveryMechanisms, "discoveryMechanisms");
8082
this.lbConfig = checkNotNull(lbConfig, "lbConfig");
83+
this.isHttp11ProxyAvailable = isHttp11ProxyAvailable;
84+
}
85+
86+
boolean isHttp11ProxyAvailable() {
87+
return isHttp11ProxyAvailable;
8188
}
8289

8390
@Override

xds/src/main/java/io/grpc/xds/Endpoints.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.auto.value.AutoValue;
2222
import com.google.common.annotations.VisibleForTesting;
2323
import com.google.common.collect.ImmutableList;
24+
import com.google.common.collect.ImmutableMap;
2425
import io.grpc.EquivalentAddressGroup;
2526
import java.net.InetSocketAddress;
2627
import java.util.List;
@@ -41,11 +42,13 @@ abstract static class LocalityLbEndpoints {
4142
// Locality's priority level.
4243
abstract int priority();
4344

45+
abstract ImmutableMap<String, Object> localityMetadata();
46+
4447
static LocalityLbEndpoints create(List<LbEndpoint> endpoints, int localityWeight,
45-
int priority) {
48+
int priority, ImmutableMap<String, Object> localityMetadata) {
4649
checkArgument(localityWeight > 0, "localityWeight must be greater than 0");
4750
return new AutoValue_Endpoints_LocalityLbEndpoints(
48-
ImmutableList.copyOf(endpoints), localityWeight, priority);
51+
ImmutableList.copyOf(endpoints), localityWeight, priority, localityMetadata);
4952
}
5053
}
5154

@@ -63,17 +66,20 @@ abstract static class LbEndpoint {
6366

6467
abstract String hostname();
6568

69+
abstract ImmutableMap<String, Object> endpointMetadata();
70+
6671
static LbEndpoint create(EquivalentAddressGroup eag, int loadBalancingWeight,
67-
boolean isHealthy, String hostname) {
68-
return new AutoValue_Endpoints_LbEndpoint(eag, loadBalancingWeight, isHealthy, hostname);
72+
boolean isHealthy, String hostname, ImmutableMap<String, Object> endpointMetadata) {
73+
return new AutoValue_Endpoints_LbEndpoint(
74+
eag, loadBalancingWeight, isHealthy, hostname, endpointMetadata);
6975
}
7076

7177
// Only for testing.
7278
@VisibleForTesting
73-
static LbEndpoint create(
74-
String address, int port, int loadBalancingWeight, boolean isHealthy, String hostname) {
79+
static LbEndpoint create(String address, int port, int loadBalancingWeight, boolean isHealthy,
80+
String hostname, ImmutableMap<String, Object> endpointMetadata) {
7581
return LbEndpoint.create(new EquivalentAddressGroup(new InetSocketAddress(address, port)),
76-
loadBalancingWeight, isHealthy, hostname);
82+
loadBalancingWeight, isHealthy, hostname, endpointMetadata);
7783
}
7884
}
7985

xds/src/main/java/io/grpc/xds/GcpAuthenticationFilter.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import io.grpc.Status;
3737
import io.grpc.auth.MoreCallCredentials;
3838
import io.grpc.xds.MetadataRegistry.MetadataValueParser;
39+
import io.grpc.xds.client.XdsResourceType.ResourceInvalidException;
3940
import java.util.LinkedHashMap;
4041
import java.util.Map;
4142
import java.util.concurrent.ScheduledExecutorService;
@@ -240,11 +241,16 @@ public String getTypeUrl() {
240241
}
241242

242243
@Override
243-
public String parse(Any any) throws InvalidProtocolBufferException {
244-
Audience audience = any.unpack(Audience.class);
244+
public String parse(Any any) throws ResourceInvalidException {
245+
Audience audience;
246+
try {
247+
audience = any.unpack(Audience.class);
248+
} catch (InvalidProtocolBufferException ex) {
249+
throw new ResourceInvalidException("Invalid Resource in address proto", ex);
250+
}
245251
String url = audience.getUrl();
246252
if (url.isEmpty()) {
247-
throw new InvalidProtocolBufferException(
253+
throw new ResourceInvalidException(
248254
"Audience URL is empty. Metadata value must contain a valid URL.");
249255
}
250256
return url;

xds/src/main/java/io/grpc/xds/MetadataRegistry.java

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,14 @@
1717
package io.grpc.xds;
1818

1919
import com.google.common.annotations.VisibleForTesting;
20+
import com.google.common.collect.ImmutableMap;
2021
import com.google.protobuf.Any;
21-
import com.google.protobuf.InvalidProtocolBufferException;
22+
import com.google.protobuf.Struct;
23+
import io.envoyproxy.envoy.config.core.v3.Metadata;
2224
import io.grpc.xds.GcpAuthenticationFilter.AudienceMetadataParser;
25+
import io.grpc.xds.XdsEndpointResource.AddressMetadataParser;
26+
import io.grpc.xds.client.XdsResourceType.ResourceInvalidException;
27+
import io.grpc.xds.internal.ProtobufJsonConverter;
2328
import java.util.HashMap;
2429
import java.util.Map;
2530

@@ -36,6 +41,7 @@ final class MetadataRegistry {
3641

3742
private MetadataRegistry() {
3843
registerParser(new AudienceMetadataParser());
44+
registerParser(new AddressMetadataParser());
3945
}
4046

4147
static MetadataRegistry getInstance() {
@@ -55,6 +61,54 @@ void removeParser(MetadataValueParser parser) {
5561
supportedParsers.remove(parser.getTypeUrl());
5662
}
5763

64+
/**
65+
* Parses cluster metadata into a structured map.
66+
*
67+
* <p>Values in {@code typed_filter_metadata} take precedence over
68+
* {@code filter_metadata} when keys overlap, following Envoy API behavior. See
69+
* <a href="https://github.com/envoyproxy/envoy/blob/main/api/envoy/config/core/v3/base.proto#L217-L259">
70+
* Envoy metadata documentation </a> for details.
71+
*
72+
* @param metadata the {@link Metadata} containing the fields to parse.
73+
* @return an immutable map of parsed metadata.
74+
* @throws ResourceInvalidException if parsing {@code typed_filter_metadata} fails.
75+
*/
76+
public ImmutableMap<String, Object> parseMetadata(Metadata metadata)
77+
throws ResourceInvalidException {
78+
ImmutableMap.Builder<String, Object> parsedMetadata = ImmutableMap.builder();
79+
80+
// Process typed_filter_metadata
81+
for (Map.Entry<String, Any> entry : metadata.getTypedFilterMetadataMap().entrySet()) {
82+
String key = entry.getKey();
83+
Any value = entry.getValue();
84+
MetadataValueParser parser = findParser(value.getTypeUrl());
85+
if (parser != null) {
86+
try {
87+
Object parsedValue = parser.parse(value);
88+
parsedMetadata.put(key, parsedValue);
89+
} catch (ResourceInvalidException e) {
90+
throw new ResourceInvalidException(
91+
String.format("Failed to parse metadata key: %s, type: %s. Error: %s",
92+
key, value.getTypeUrl(), e.getMessage()), e);
93+
}
94+
}
95+
}
96+
// building once to reuse in the next loop
97+
ImmutableMap<String, Object> intermediateParsedMetadata = parsedMetadata.build();
98+
99+
// Process filter_metadata for remaining keys
100+
for (Map.Entry<String, Struct> entry : metadata.getFilterMetadataMap().entrySet()) {
101+
String key = entry.getKey();
102+
if (!intermediateParsedMetadata.containsKey(key)) {
103+
Struct structValue = entry.getValue();
104+
Object jsonValue = ProtobufJsonConverter.convertToJson(structValue);
105+
parsedMetadata.put(key, jsonValue);
106+
}
107+
}
108+
109+
return parsedMetadata.build();
110+
}
111+
58112
interface MetadataValueParser {
59113

60114
String getTypeUrl();
@@ -64,8 +118,8 @@ interface MetadataValueParser {
64118
*
65119
* @param any the {@link Any} object to parse.
66120
* @return the parsed metadata value.
67-
* @throws InvalidProtocolBufferException if the parsing fails.
121+
* @throws ResourceInvalidException if the parsing fails.
68122
*/
69-
Object parse(Any any) throws InvalidProtocolBufferException;
123+
Object parse(Any any) throws ResourceInvalidException;
70124
}
71125
}

0 commit comments

Comments
 (0)