Skip to content

Commit 48cb95b

Browse files
Copilotaepfli
andcommitted
Implement header-based selector for sync stream connection
Co-authored-by: aepfli <9987394+aepfli@users.noreply.github.com>
1 parent 2287985 commit 48cb95b

File tree

1 file changed

+45
-2
lines changed
  • providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync

1 file changed

+45
-2
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,13 @@
1717
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse;
1818
import dev.openfeature.sdk.Awaitable;
1919
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
20+
import io.grpc.CallOptions;
21+
import io.grpc.Channel;
22+
import io.grpc.ClientCall;
23+
import io.grpc.ClientInterceptor;
24+
import io.grpc.ForwardingClientCall;
25+
import io.grpc.Metadata;
26+
import io.grpc.MethodDescriptor;
2027
import io.grpc.Status;
2128
import io.grpc.StatusRuntimeException;
2229
import io.grpc.stub.StreamObserver;
@@ -60,8 +67,14 @@ public SyncStreamQueueSource(final FlagdOptions options, Consumer<FlagdProviderE
6067
maxBackoffMs = options.getRetryBackoffMaxMs();
6168
syncMetadataDisabled = options.isSyncMetadataDisabled();
6269
channelConnector = new ChannelConnector(options, onConnectionEvent, ChannelBuilder.nettyChannel(options));
63-
flagSyncStub =
64-
FlagSyncServiceGrpc.newStub(channelConnector.getChannel()).withWaitForReady();
70+
71+
// Apply selector header interceptor if selector is configured
72+
Channel channel = channelConnector.getChannel();
73+
if (selector != null) {
74+
channel = io.grpc.ClientInterceptors.intercept(channel, createSelectorInterceptor(selector));
75+
}
76+
77+
flagSyncStub = FlagSyncServiceGrpc.newStub(channel).withWaitForReady();
6578
metadataStub = FlagSyncServiceGrpc.newBlockingStub(channelConnector.getChannel())
6679
.withWaitForReady();
6780
}
@@ -184,6 +197,8 @@ private void syncFlags(SyncStreamObserver streamObserver) {
184197
}
185198

186199
final SyncFlagsRequest.Builder syncRequest = SyncFlagsRequest.newBuilder();
200+
// Selector is now passed via header using ClientInterceptor (see constructor)
201+
// Keeping this for backward compatibility with older flagd versions
187202
if (this.selector != null) {
188203
syncRequest.setSelector(this.selector);
189204
}
@@ -196,6 +211,34 @@ private void syncFlags(SyncStreamObserver streamObserver) {
196211

197212
streamObserver.done.await();
198213
}
214+
215+
/**
216+
* Creates a ClientInterceptor that adds the flagd-selector header to gRPC requests.
217+
* This is the preferred approach for passing selectors as per flagd issue #1814.
218+
*
219+
* @param selector the selector value to pass in the header
220+
* @return a ClientInterceptor that adds the flagd-selector header
221+
*/
222+
private static ClientInterceptor createSelectorInterceptor(String selector) {
223+
return new ClientInterceptor() {
224+
@Override
225+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
226+
MethodDescriptor<ReqT, RespT> method,
227+
CallOptions callOptions,
228+
Channel next) {
229+
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
230+
next.newCall(method, callOptions)) {
231+
@Override
232+
public void start(Listener<RespT> responseListener, Metadata headers) {
233+
headers.put(
234+
Metadata.Key.of("flagd-selector", Metadata.ASCII_STRING_MARSHALLER),
235+
selector);
236+
super.start(responseListener, headers);
237+
}
238+
};
239+
}
240+
};
241+
}
199242

200243
private void enqueueError(String message) {
201244
enqueueError(outgoingQueue, message);

0 commit comments

Comments
 (0)