Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: Change how xDS filters are created by introducing Filter.Provider #11883

Merged
merged 1 commit into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 91 additions & 79 deletions xds/src/main/java/io/grpc/xds/FaultFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import io.grpc.internal.GrpcUtil;
import io.grpc.xds.FaultConfig.FaultAbort;
import io.grpc.xds.FaultConfig.FaultDelay;
import io.grpc.xds.Filter.ClientInterceptorBuilder;
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
import java.util.Locale;
import java.util.concurrent.Executor;
Expand All @@ -56,10 +55,11 @@
import javax.annotation.Nullable;

/** HttpFault filter implementation. */
final class FaultFilter implements Filter, ClientInterceptorBuilder {
final class FaultFilter implements Filter {

static final FaultFilter INSTANCE =
private static final FaultFilter INSTANCE =
new FaultFilter(ThreadSafeRandomImpl.instance, new AtomicLong());

@VisibleForTesting
static final Metadata.Key<String> HEADER_DELAY_KEY =
Metadata.Key.of("x-envoy-fault-delay-request", Metadata.ASCII_STRING_MARSHALLER);
Expand Down Expand Up @@ -87,96 +87,108 @@
this.activeFaultCounter = activeFaultCounter;
}

@Override
public String[] typeUrls() {
return new String[] { TYPE_URL };
}

@Override
public ConfigOrError<FaultConfig> parseFilterConfig(Message rawProtoMessage) {
HTTPFault httpFaultProto;
if (!(rawProtoMessage instanceof Any)) {
return ConfigOrError.fromError("Invalid config type: " + rawProtoMessage.getClass());
static final class Provider implements Filter.Provider {
@Override
public String[] typeUrls() {
return new String[]{TYPE_URL};
}
Any anyMessage = (Any) rawProtoMessage;
try {
httpFaultProto = anyMessage.unpack(HTTPFault.class);
} catch (InvalidProtocolBufferException e) {
return ConfigOrError.fromError("Invalid proto: " + e);

@Override
public boolean isClientFilter() {
return true;
}
return parseHttpFault(httpFaultProto);
}

private static ConfigOrError<FaultConfig> parseHttpFault(HTTPFault httpFault) {
FaultDelay faultDelay = null;
FaultAbort faultAbort = null;
if (httpFault.hasDelay()) {
faultDelay = parseFaultDelay(httpFault.getDelay());
@Override
public FaultFilter newInstance() {
return INSTANCE;

Check warning on line 103 in xds/src/main/java/io/grpc/xds/FaultFilter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/FaultFilter.java#L103

Added line #L103 was not covered by tests
}
if (httpFault.hasAbort()) {
ConfigOrError<FaultAbort> faultAbortOrError = parseFaultAbort(httpFault.getAbort());
if (faultAbortOrError.errorDetail != null) {
return ConfigOrError.fromError(
"HttpFault contains invalid FaultAbort: " + faultAbortOrError.errorDetail);

@Override
public ConfigOrError<FaultConfig> parseFilterConfig(Message rawProtoMessage) {
HTTPFault httpFaultProto;
if (!(rawProtoMessage instanceof Any)) {
return ConfigOrError.fromError("Invalid config type: " + rawProtoMessage.getClass());

Check warning on line 110 in xds/src/main/java/io/grpc/xds/FaultFilter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/FaultFilter.java#L110

Added line #L110 was not covered by tests
}
faultAbort = faultAbortOrError.config;
}
Integer maxActiveFaults = null;
if (httpFault.hasMaxActiveFaults()) {
maxActiveFaults = httpFault.getMaxActiveFaults().getValue();
if (maxActiveFaults < 0) {
maxActiveFaults = Integer.MAX_VALUE;
Any anyMessage = (Any) rawProtoMessage;
try {
httpFaultProto = anyMessage.unpack(HTTPFault.class);
} catch (InvalidProtocolBufferException e) {
return ConfigOrError.fromError("Invalid proto: " + e);

Check warning on line 116 in xds/src/main/java/io/grpc/xds/FaultFilter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/FaultFilter.java#L115-L116

Added lines #L115 - L116 were not covered by tests
}
return parseHttpFault(httpFaultProto);
}
return ConfigOrError.fromConfig(FaultConfig.create(faultDelay, faultAbort, maxActiveFaults));
}

private static FaultDelay parseFaultDelay(
io.envoyproxy.envoy.extensions.filters.common.fault.v3.FaultDelay faultDelay) {
FaultConfig.FractionalPercent percent = parsePercent(faultDelay.getPercentage());
if (faultDelay.hasHeaderDelay()) {
return FaultDelay.forHeader(percent);
@Override
public ConfigOrError<FaultConfig> parseFilterConfigOverride(Message rawProtoMessage) {
return parseFilterConfig(rawProtoMessage);
}
return FaultDelay.forFixedDelay(Durations.toNanos(faultDelay.getFixedDelay()), percent);
}

@VisibleForTesting
static ConfigOrError<FaultAbort> parseFaultAbort(
io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort faultAbort) {
FaultConfig.FractionalPercent percent = parsePercent(faultAbort.getPercentage());
switch (faultAbort.getErrorTypeCase()) {
case HEADER_ABORT:
return ConfigOrError.fromConfig(FaultAbort.forHeader(percent));
case HTTP_STATUS:
return ConfigOrError.fromConfig(FaultAbort.forStatus(
GrpcUtil.httpStatusToGrpcStatus(faultAbort.getHttpStatus()), percent));
case GRPC_STATUS:
return ConfigOrError.fromConfig(FaultAbort.forStatus(
Status.fromCodeValue(faultAbort.getGrpcStatus()), percent));
case ERRORTYPE_NOT_SET:
default:
return ConfigOrError.fromError(
"Unknown error type case: " + faultAbort.getErrorTypeCase());
private static ConfigOrError<FaultConfig> parseHttpFault(HTTPFault httpFault) {
FaultDelay faultDelay = null;
FaultAbort faultAbort = null;
if (httpFault.hasDelay()) {
faultDelay = parseFaultDelay(httpFault.getDelay());
}
if (httpFault.hasAbort()) {
ConfigOrError<FaultAbort> faultAbortOrError = parseFaultAbort(httpFault.getAbort());
if (faultAbortOrError.errorDetail != null) {
return ConfigOrError.fromError(

Check warning on line 135 in xds/src/main/java/io/grpc/xds/FaultFilter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/FaultFilter.java#L135

Added line #L135 was not covered by tests
"HttpFault contains invalid FaultAbort: " + faultAbortOrError.errorDetail);
}
faultAbort = faultAbortOrError.config;
}
Integer maxActiveFaults = null;
if (httpFault.hasMaxActiveFaults()) {
maxActiveFaults = httpFault.getMaxActiveFaults().getValue();
if (maxActiveFaults < 0) {
maxActiveFaults = Integer.MAX_VALUE;

Check warning on line 144 in xds/src/main/java/io/grpc/xds/FaultFilter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/FaultFilter.java#L144

Added line #L144 was not covered by tests
}
}
return ConfigOrError.fromConfig(FaultConfig.create(faultDelay, faultAbort, maxActiveFaults));
}
}

private static FaultConfig.FractionalPercent parsePercent(FractionalPercent proto) {
switch (proto.getDenominator()) {
case HUNDRED:
return FaultConfig.FractionalPercent.perHundred(proto.getNumerator());
case TEN_THOUSAND:
return FaultConfig.FractionalPercent.perTenThousand(proto.getNumerator());
case MILLION:
return FaultConfig.FractionalPercent.perMillion(proto.getNumerator());
case UNRECOGNIZED:
default:
throw new IllegalArgumentException("Unknown denominator type: " + proto.getDenominator());
private static FaultDelay parseFaultDelay(
io.envoyproxy.envoy.extensions.filters.common.fault.v3.FaultDelay faultDelay) {
FaultConfig.FractionalPercent percent = parsePercent(faultDelay.getPercentage());
if (faultDelay.hasHeaderDelay()) {
return FaultDelay.forHeader(percent);

Check warning on line 154 in xds/src/main/java/io/grpc/xds/FaultFilter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/FaultFilter.java#L154

Added line #L154 was not covered by tests
}
return FaultDelay.forFixedDelay(Durations.toNanos(faultDelay.getFixedDelay()), percent);
}
}

@Override
public ConfigOrError<FaultConfig> parseFilterConfigOverride(Message rawProtoMessage) {
return parseFilterConfig(rawProtoMessage);
@VisibleForTesting
static ConfigOrError<FaultAbort> parseFaultAbort(
io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort faultAbort) {
FaultConfig.FractionalPercent percent = parsePercent(faultAbort.getPercentage());
switch (faultAbort.getErrorTypeCase()) {
case HEADER_ABORT:
return ConfigOrError.fromConfig(FaultAbort.forHeader(percent));
case HTTP_STATUS:
return ConfigOrError.fromConfig(FaultAbort.forStatus(
GrpcUtil.httpStatusToGrpcStatus(faultAbort.getHttpStatus()), percent));
case GRPC_STATUS:
return ConfigOrError.fromConfig(FaultAbort.forStatus(
Status.fromCodeValue(faultAbort.getGrpcStatus()), percent));
case ERRORTYPE_NOT_SET:
default:
return ConfigOrError.fromError(
"Unknown error type case: " + faultAbort.getErrorTypeCase());

Check warning on line 175 in xds/src/main/java/io/grpc/xds/FaultFilter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/FaultFilter.java#L174-L175

Added lines #L174 - L175 were not covered by tests
}
}

private static FaultConfig.FractionalPercent parsePercent(FractionalPercent proto) {
switch (proto.getDenominator()) {
case HUNDRED:
return FaultConfig.FractionalPercent.perHundred(proto.getNumerator());
case TEN_THOUSAND:
return FaultConfig.FractionalPercent.perTenThousand(proto.getNumerator());
case MILLION:
return FaultConfig.FractionalPercent.perMillion(proto.getNumerator());
case UNRECOGNIZED:
default:
throw new IllegalArgumentException("Unknown denominator type: " + proto.getDenominator());

Check warning on line 189 in xds/src/main/java/io/grpc/xds/FaultFilter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/FaultFilter.java#L189

Added line #L189 was not covered by tests
}
}
}

@Nullable
Expand Down
90 changes: 62 additions & 28 deletions xds/src/main/java/io/grpc/xds/Filter.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,48 +25,82 @@
import javax.annotation.Nullable;

/**
* Defines the parsing functionality of an HTTP filter. A Filter may optionally implement either
* {@link ClientInterceptorBuilder} or {@link ServerInterceptorBuilder} or both, indicating it is
* capable of working on the client side or server side or both, respectively.
* Defines the parsing functionality of an HTTP filter.
*
* <p>A Filter may optionally implement either {@link Filter#buildClientInterceptor} or
* {@link Filter#buildServerInterceptor} or both, and return true from corresponding
* {@link Provider#isClientFilter()}, {@link Provider#isServerFilter()} to indicate that the filter
* is capable of working on the client side or server side or both, respectively.
*/
interface Filter {

/**
* The proto message types supported by this filter. A filter will be registered by each of its
* supported message types.
*/
String[] typeUrls();
/** Represents an opaque data structure holding configuration for a filter. */
interface FilterConfig {
String typeUrl();
}

/**
* Parses the top-level filter config from raw proto message. The message may be either a {@link
* com.google.protobuf.Any} or a {@link com.google.protobuf.Struct}.
* Common interface for filter providers.
*/
ConfigOrError<? extends FilterConfig> parseFilterConfig(Message rawProtoMessage);
interface Provider {
/**
* The proto message types supported by this filter. A filter will be registered by each of its
* supported message types.
*/
String[] typeUrls();

/**
* Parses the per-filter override filter config from raw proto message. The message may be either
* a {@link com.google.protobuf.Any} or a {@link com.google.protobuf.Struct}.
*/
ConfigOrError<? extends FilterConfig> parseFilterConfigOverride(Message rawProtoMessage);
/**
* Whether the filter can be installed on the client side.
*
* <p>Returns true if the filter implements {@link Filter#buildClientInterceptor}.
*/
default boolean isClientFilter() {
return false;
}

/** Represents an opaque data structure holding configuration for a filter. */
interface FilterConfig {
String typeUrl();
/**
* Whether the filter can be installed into xDS-enabled servers.
*
* <p>Returns true if the filter implements {@link Filter#buildServerInterceptor}.
*/
default boolean isServerFilter() {
return false;
}

/**
* Creates a new instance of the filter.
*
* <p>Returns a filter instance registered with the same typeUrls as the provider,
* capable of working with the same FilterConfig type returned by provider's parse functions.
*/
Filter newInstance();

/**
* Parses the top-level filter config from raw proto message. The message may be either a {@link
* com.google.protobuf.Any} or a {@link com.google.protobuf.Struct}.
*/
ConfigOrError<? extends FilterConfig> parseFilterConfig(Message rawProtoMessage);

/**
* Parses the per-filter override filter config from raw proto message. The message may be
* either a {@link com.google.protobuf.Any} or a {@link com.google.protobuf.Struct}.
*/
ConfigOrError<? extends FilterConfig> parseFilterConfigOverride(Message rawProtoMessage);
}

/** Uses the FilterConfigs produced above to produce an HTTP filter interceptor for clients. */
interface ClientInterceptorBuilder {
@Nullable
ClientInterceptor buildClientInterceptor(
FilterConfig config, @Nullable FilterConfig overrideConfig,
ScheduledExecutorService scheduler);
@Nullable
default ClientInterceptor buildClientInterceptor(
FilterConfig config, @Nullable FilterConfig overrideConfig,
ScheduledExecutorService scheduler) {
return null;
}

/** Uses the FilterConfigs produced above to produce an HTTP filter interceptor for the server. */
interface ServerInterceptorBuilder {
@Nullable
ServerInterceptor buildServerInterceptor(
FilterConfig config, @Nullable FilterConfig overrideConfig);
@Nullable
default ServerInterceptor buildServerInterceptor(
FilterConfig config, @Nullable FilterConfig overrideConfig) {
return null;
}

/** Filter config with instance name. */
Expand Down
16 changes: 8 additions & 8 deletions xds/src/main/java/io/grpc/xds/FilterRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,21 @@

/**
* A registry for all supported {@link Filter}s. Filters can be queried from the registry
* by any of the {@link Filter#typeUrls() type URLs}.
* by any of the {@link Filter.Provider#typeUrls() type URLs}.
*/
final class FilterRegistry {
private static FilterRegistry instance;

private final Map<String, Filter> supportedFilters = new HashMap<>();
private final Map<String, Filter.Provider> supportedFilters = new HashMap<>();

private FilterRegistry() {}

static synchronized FilterRegistry getDefaultRegistry() {
if (instance == null) {
instance = newRegistry().register(
FaultFilter.INSTANCE,
RouterFilter.INSTANCE,
RbacFilter.INSTANCE);
new FaultFilter.Provider(),
new RouterFilter.Provider(),
new RbacFilter.Provider());
}
return instance;
}
Expand All @@ -48,8 +48,8 @@ static FilterRegistry newRegistry() {
}

@VisibleForTesting
FilterRegistry register(Filter... filters) {
for (Filter filter : filters) {
FilterRegistry register(Filter.Provider... filters) {
for (Filter.Provider filter : filters) {
for (String typeUrl : filter.typeUrls()) {
supportedFilters.put(typeUrl, filter);
}
Expand All @@ -58,7 +58,7 @@ FilterRegistry register(Filter... filters) {
}

@Nullable
Filter get(String typeUrl) {
Filter.Provider get(String typeUrl) {
return supportedFilters.get(typeUrl);
}
}
Loading