Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion providers/flagd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,46 @@ FlagdProvider flagdProvider = new FlagdProvider(

In the above example, in-process handlers attempt to connect to a sync service on address `localhost:8013` to obtain [flag definitions](https://github.com/open-feature/schemas/blob/main/json/flags.json).

#### Selector filtering (In-process mode only)

The `selector` option allows filtering flag configurations from flagd based on source identifiers when using the in-process resolver. This is useful when flagd is configured with multiple flag sources and you want to sync only a specific subset.

##### Usage

To use selector filtering, simply configure the `selector` option when creating the provider:

```java
FlagdProvider flagdProvider = new FlagdProvider(
FlagdOptions.builder()
.resolverType(Config.Resolver.IN_PROCESS)
.selector("source=my-app")
.build());
```

Or via environment variable:
```bash
export FLAGD_SOURCE_SELECTOR="source=my-app"
```

##### Implementation details

> [!IMPORTANT]
> **Selector normalization (flagd issue #1814)**
>
> As part of [flagd issue #1814](https://github.com/open-feature/flagd/issues/1814), the flagd project is normalizing selector handling across all services to use the `flagd-selector` gRPC metadata header.
>
> **Current implementation:**
> - The Java SDK **automatically passes the selector via the `flagd-selector` header** (preferred approach)
> - For backward compatibility with older flagd versions, the selector is **also sent in the request body**
> - Both methods work with current flagd versions
> - In a future major version of flagd, the request body selector field may be removed
>
> **No migration needed:**
>
> Users do not need to make any code changes. The SDK handles selector normalization automatically.

For more details on selector normalization, see the [flagd selector normalization issue](https://github.com/open-feature/flagd/issues/1814).

#### Sync-metadata

To support the injection of contextual data configured in flagd for in-process evaluation, the provider exposes a `getSyncMetadata` accessor which provides the most recent value returned by the [GetMetadata RPC](https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1#flagd.sync.v1.FlagSyncService.GetMetadata).
Expand Down Expand Up @@ -119,7 +159,7 @@ Given below are the supported configurations:
| deadline | FLAGD_DEADLINE_MS | int | 500 | rpc & in-process & file |
| streamDeadlineMs | FLAGD_STREAM_DEADLINE_MS | int | 600000 | rpc & in-process |
| keepAliveTime | FLAGD_KEEP_ALIVE_TIME_MS | long | 0 | rpc & in-process |
| selector | FLAGD_SOURCE_SELECTOR | String | null | in-process |
| selector | FLAGD_SOURCE_SELECTOR | String | null | in-process (see [details](#selector-filtering-in-process-mode-only)) |
| providerId | FLAGD_SOURCE_PROVIDER_ID | String | null | in-process |
| cache | FLAGD_CACHE | String - lru, disabled | lru | rpc |
| maxCacheSize | FLAGD_MAX_CACHE_SIZE | int | 1000 | rpc |
Expand All @@ -130,6 +170,9 @@ Given below are the supported configurations:

> [!NOTE]
> Some configurations are only applicable for RPC resolver.

> [!NOTE]
> The `selector` option automatically uses the `flagd-selector` header (the preferred approach per [flagd issue #1814](https://github.com/open-feature/flagd/issues/1814)) while maintaining backward compatibility with older flagd versions. See [Selector filtering](#selector-filtering-in-process-mode-only) for details.
>

### Unix socket support
Expand Down Expand Up @@ -189,6 +232,9 @@ FlagdProvider flagdProvider = new FlagdProvider(

The `clientInterceptors` and `defaultAuthority` are meant for connection of the in-process resolver to a Sync API implementation on a host/port, that might require special credentials or headers.

> [!NOTE]
> The SDK automatically handles the `flagd-selector` header when the `selector` option is configured. Custom interceptors are not needed for selector filtering. See [Selector filtering](#selector-filtering-in-process-mode-only) for details.

```java
private static ClientInterceptor createHeaderInterceptor() {
return new ClientInterceptor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ public class FlagdOptions {
fallBackToEnvOrDefault(Config.STREAM_RETRY_GRACE_PERIOD, Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD);
/**
* Selector to be used with flag sync gRPC contract.
*
* <p>The SDK automatically passes the selector via the {@code flagd-selector} gRPC metadata header
* (the preferred approach per <a href="https://github.com/open-feature/flagd/issues/1814">flagd issue #1814</a>).
* For backward compatibility with older flagd versions, the selector is also sent in the request body.
*
* <p>Only applicable for in-process resolver mode.
*
* @see <a href="https://github.com/open-feature/java-sdk-contrib/tree/main/providers/flagd#selector-filtering-in-process-mode-only">Selector filtering documentation</a>
**/
@Builder.Default
private String selector = fallBackToEnvOrDefault(Config.SOURCE_SELECTOR_ENV_VAR_NAME, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse;
import dev.openfeature.sdk.Awaitable;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
Expand All @@ -36,6 +43,8 @@
justification = "Random is used to generate a variation & flag configurations require exposing")
public class SyncStreamQueueSource implements QueueSource {
private static final int QUEUE_SIZE = 5;
private static final Metadata.Key<String> FLAGD_SELECTOR_KEY =
Metadata.Key.of("flagd-selector", Metadata.ASCII_STRING_MARSHALLER);

private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final int streamDeadline;
Expand All @@ -60,10 +69,15 @@ public SyncStreamQueueSource(final FlagdOptions options, Consumer<FlagdProviderE
maxBackoffMs = options.getRetryBackoffMaxMs();
syncMetadataDisabled = options.isSyncMetadataDisabled();
channelConnector = new ChannelConnector(options, onConnectionEvent, ChannelBuilder.nettyChannel(options));
flagSyncStub =
FlagSyncServiceGrpc.newStub(channelConnector.getChannel()).withWaitForReady();
metadataStub = FlagSyncServiceGrpc.newBlockingStub(channelConnector.getChannel())
.withWaitForReady();

// Apply selector header interceptor if selector is configured
Channel channel = channelConnector.getChannel();
if (selector != null) {
channel = io.grpc.ClientInterceptors.intercept(channel, createSelectorInterceptor(selector));
}
Comment on lines +73 to +77

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This new logic for adding the selector interceptor is not covered by existing tests, as they mock the gRPC stubs directly. To improve testability, consider extracting the channel and stub creation logic into a separate, package-private method. This would allow you to write a unit test that verifies the interceptor is correctly added when a selector is present, without needing a full gRPC channel.


flagSyncStub = FlagSyncServiceGrpc.newStub(channel).withWaitForReady();
metadataStub = FlagSyncServiceGrpc.newBlockingStub(channel).withWaitForReady();
}

// internal use only
Expand Down Expand Up @@ -184,6 +198,8 @@ private void syncFlags(SyncStreamObserver streamObserver) {
}

final SyncFlagsRequest.Builder syncRequest = SyncFlagsRequest.newBuilder();
// Selector is now passed via header using ClientInterceptor (see constructor)
// Keeping this for backward compatibility with older flagd versions
if (this.selector != null) {
syncRequest.setSelector(this.selector);
}
Expand All @@ -197,6 +213,30 @@ private void syncFlags(SyncStreamObserver streamObserver) {
streamObserver.done.await();
}

/**
* Creates a ClientInterceptor that adds the flagd-selector header to gRPC requests.
* This is the preferred approach for passing selectors as per flagd issue #1814.
*
* @param selector the selector value to pass in the header
* @return a ClientInterceptor that adds the flagd-selector header
*/
private static ClientInterceptor createSelectorInterceptor(String selector) {
return new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
headers.put(FLAGD_SELECTOR_KEY, selector);
super.start(responseListener, headers);
}
};
}
};
}

private void enqueueError(String message) {
enqueueError(outgoingQueue, message);
}
Expand Down