Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector;
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
import dev.openfeature.contrib.providers.flagd.resolver.common.QueueingStreamObserver;
import dev.openfeature.contrib.providers.flagd.resolver.common.StreamResponseModel;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource;
Expand All @@ -16,9 +14,9 @@
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsRequest;
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse;
import dev.openfeature.sdk.Awaitable;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.Context;
import io.grpc.Context.CancellableContext;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand All @@ -43,8 +41,6 @@ public class SyncStreamQueueSource implements QueueSource {
private final String providerId;
private final boolean syncMetadataDisabled;
private final ChannelConnector<FlagSyncServiceStub, FlagSyncServiceBlockingStub> channelConnector;
private final LinkedBlockingQueue<StreamResponseModel<SyncFlagsResponse>> incomingQueue =
new LinkedBlockingQueue<>(QUEUE_SIZE);
private final BlockingQueue<QueuePayload> outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
private final FlagSyncServiceStub stub;
private final FlagSyncServiceBlockingStub blockingStub;
Expand Down Expand Up @@ -83,15 +79,7 @@ protected SyncStreamQueueSource(
/** Initialize sync stream connector. */
public void init() throws Exception {
channelConnector.initialize();
Thread listener = new Thread(() -> {
try {
observeSyncStream();
} catch (InterruptedException e) {
log.warn("gRPC event stream interrupted, flag configurations are stale", e);
Thread.currentThread().interrupt();
}
});

Thread listener = new Thread(this::observeSyncStream);
listener.setDaemon(true);
listener.start();
}
Expand All @@ -114,83 +102,58 @@ public void shutdown() throws InterruptedException {
}

/** Contains blocking calls, to be used concurrently. */
private void observeSyncStream() throws InterruptedException {

private void observeSyncStream() {
log.info("Initializing sync stream observer");

// outer loop for re-issuing the stream request
// "waitForReady" on the channel, plus our retry policy slow this loop down in error conditions
while (!shutdown.get()) {

log.debug("Initializing sync stream request");
final GetMetadataRequest.Builder metadataRequest = GetMetadataRequest.newBuilder();
GetMetadataResponse metadataResponse = null;

// create a context which exists to track and cancel the stream
try (CancellableContext context = Context.current().withCancellation()) {

restart(); // start the stream with the context

// TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584
if (!syncMetadataDisabled) {
try {
FlagSyncServiceBlockingStub localStub = blockingStub;

if (deadline > 0) {
localStub = localStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS);
}

metadataResponse = localStub.getMetadata(metadataRequest.build());
} catch (Exception metaEx) {
// cancel the stream if the getMetadata fails
// we can keep this log quiet since the stream is cancelled/restarted with this exception
log.debug("Metadata exception: {}, cancelling stream", metaEx.getMessage(), metaEx);
context.cancel(metaEx);
}
}

// inner loop for handling messages
while (!shutdown.get() && !Context.current().isCancelled()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this call to the current context Context.current().isCancelled() intentional or should it have been a call to the context context.isCancelled(), created before and used in the metadata block above to also skip flag sync when?

Context.current() != context, because the call to Context.current().withCancellation() only creates a new context, but does not set it as current context.

In the previous version, flags are always synced, even if getMetadata fails. This behavior is different in my implementation. In the new version, when getMatadata fails, we continue the loop and try again.

@toddbaert I saw this change was introduced by you here. Any chance you can recall the intention? Based on the log message in the metadata catch block, I'd say we want the syncFlags stream to start over again.

final StreamResponseModel<SyncFlagsResponse> taken = incomingQueue.take();
if (taken.isComplete()) {
log.debug("Sync stream completed, will restart");
// The stream is complete, we still try to reconnect
break;
}

Throwable streamException = taken.getError();
if (streamException != null) {
log.debug("Exception in stream RPC, streamException {}, will restart", streamException);
if (!outgoingQueue.offer(new QueuePayload(
QueuePayloadType.ERROR,
String.format("Error from stream: %s", streamException.getMessage())))) {
log.error("Failed to convey ERROR status, queue is full");
}
break;
}

final SyncFlagsResponse flagsResponse = taken.getResponse();
final String data = flagsResponse.getFlagConfiguration();
log.debug("Got stream response: {}", data);

Struct syncContext = null;
if (flagsResponse.hasSyncContext()) {
syncContext = flagsResponse.getSyncContext();
} else if (metadataResponse != null) {
syncContext = metadataResponse.getMetadata();
}

if (!outgoingQueue.offer(new QueuePayload(QueuePayloadType.DATA, data, syncContext))) {
log.error("Stream writing failed");
}
}
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue);

try {
observer.metadata = getMetadata();
} catch (Exception metaEx) {
// retry if getMetadata fails
String message = metaEx.getMessage();
log.debug("Metadata request error: {}, will restart", message, metaEx);
enqueueError(String.format("Error in getMetadata request: %s", message));
continue;
}

try {
syncFlags(observer);
} catch (Exception ex) {
log.error("Unexpected sync stream exception, will restart.", ex);
enqueueError(String.format("Error in getMetadata request: %s", ex.getMessage()));
}
}

log.info("Shutdown invoked, exiting event stream listener");
}

private void restart() {
// TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584
private Struct getMetadata() {
if (syncMetadataDisabled) {
return null;
}

FlagSyncServiceBlockingStub localStub = blockingStub;

if (deadline > 0) {
localStub = localStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS);
}

GetMetadataResponse metadataResponse = localStub.getMetadata(GetMetadataRequest.getDefaultInstance());

if (metadataResponse != null) {
return metadataResponse.getMetadata();
}

return null;
}

private void syncFlags(SyncStreamObserver streamObserver) {
FlagSyncServiceStub localStub = stub; // don't mutate the stub
if (streamDeadline > 0) {
localStub = localStub.withDeadlineAfter(streamDeadline, TimeUnit.MILLISECONDS);
Expand All @@ -205,6 +168,59 @@ private void restart() {
syncRequest.setProviderId(this.providerId);
}

localStub.syncFlags(syncRequest.build(), new QueueingStreamObserver<SyncFlagsResponse>(incomingQueue));
localStub.syncFlags(syncRequest.build(), streamObserver);

streamObserver.done.await();
}

private void enqueueError(String message) {
enqueueError(outgoingQueue, message);
}

private static void enqueueError(BlockingQueue<QueuePayload> queue, String message) {
if (!queue.offer(new QueuePayload(
QueuePayloadType.ERROR, message, null))) {
log.error("Failed to convey ERROR status, queue is full");
}
}

private static class SyncStreamObserver implements StreamObserver<SyncFlagsResponse> {
private final BlockingQueue<QueuePayload> outgoingQueue;
private final Awaitable done = new Awaitable();

private Struct metadata;

public SyncStreamObserver(BlockingQueue<QueuePayload> outgoingQueue) {
this.outgoingQueue = outgoingQueue;
}

@Override
public void onNext(SyncFlagsResponse syncFlagsResponse) {
final String data = syncFlagsResponse.getFlagConfiguration();
log.debug("Got stream response: {}", data);

Struct syncContext = syncFlagsResponse.hasSyncContext() ? syncFlagsResponse.getSyncContext() : metadata;

if (!outgoingQueue.offer(new QueuePayload(QueuePayloadType.DATA, data, syncContext))) {
log.error("Stream writing failed");
}
}

@Override
public void onError(Throwable throwable) {
try {
String message = throwable != null ? throwable.getMessage() : "unknown";
log.debug("Stream error: {}, will restart", message, throwable);
enqueueError(outgoingQueue, String.format("Error from stream: %s", message));
} finally {
done.wakeup();
}
}

@Override
public void onCompleted() {
log.debug("Sync stream completed, will restart");
done.wakeup();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
import com.google.protobuf.Struct;
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector;
import dev.openfeature.contrib.providers.flagd.resolver.common.QueueingStreamObserver;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc.FlagSyncServiceBlockingStub;
import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc.FlagSyncServiceStub;
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsRequest;
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -35,7 +35,7 @@ class SyncStreamQueueSourceTest {
private ChannelConnector<FlagSyncServiceStub, FlagSyncServiceBlockingStub> mockConnector;
private FlagSyncServiceBlockingStub blockingStub;
private FlagSyncServiceStub stub;
private QueueingStreamObserver<SyncFlagsResponse> observer;
private StreamObserver<SyncFlagsResponse> observer;
private CountDownLatch latch; // used to wait for observer to be initialized

@BeforeEach
Expand All @@ -51,12 +51,12 @@ public void init() throws Exception {
when(stub.withDeadlineAfter(anyLong(), any())).thenReturn(stub);
doAnswer((Answer<Void>) invocation -> {
Object[] args = invocation.getArguments();
observer = (QueueingStreamObserver<SyncFlagsResponse>) args[1];
observer = (StreamObserver<SyncFlagsResponse>) args[1];
latch.countDown();
return null;
})
.when(stub)
.syncFlags(any(SyncFlagsRequest.class), any(QueueingStreamObserver.class)); // Mock the initialize
.syncFlags(any(SyncFlagsRequest.class), any(StreamObserver.class)); // Mock the initialize
// method
}

Expand Down
Loading