Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: rpc caching not behaving as expected (cleared too often) #1115

Merged
merged 15 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
[submodule "providers/flagd/test-harness"]
path = providers/flagd/test-harness
url = https://github.com/open-feature/test-harness.git
branch = v0.5.21
branch = v1.1.1
[submodule "providers/flagd/spec"]
path = providers/flagd/spec
url = https://github.com/open-feature/spec.git
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.cucumber</groupId>
<artifactId>cucumber-picocontainer</artifactId>
<version>7.20.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
Expand Down
13 changes: 13 additions & 0 deletions providers/flagd/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,20 @@
<version>1.20.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>toxiproxy</artifactId>
<version>1.20.4</version>
<scope>test</scope>
</dependency>
<!-- uncomment for logoutput during test runs -->

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.16</version>
<scope>test</scope>
</dependency>
</dependencies>

<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package dev.openfeature.contrib.providers.flagd;

import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent;
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
import dev.openfeature.contrib.providers.flagd.resolver.common.Util;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
Expand All @@ -12,12 +13,17 @@
import dev.openfeature.sdk.ImmutableStructure;
import dev.openfeature.sdk.Metadata;
import dev.openfeature.sdk.ProviderEvaluation;
import dev.openfeature.sdk.ProviderEvent;
import dev.openfeature.sdk.ProviderEventDetails;
import dev.openfeature.sdk.Structure;
import dev.openfeature.sdk.Value;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -30,11 +36,31 @@ public class FlagdProvider extends EventProvider {
private Function<Structure, EvaluationContext> contextEnricher;
private static final String FLAGD_PROVIDER = "flagd";
private final Resolver flagResolver;
private volatile boolean initialized = false;
private volatile boolean connected = false;
private volatile Structure syncMetadata = new ImmutableStructure();
private volatile EvaluationContext enrichedContext = new ImmutableContext();
private final List<Hook> hooks = new ArrayList<>();
private final EventsLock eventsLock = new EventsLock();

/**
* An executor service responsible for emitting
* {@link ProviderEvent#PROVIDER_ERROR} after the provider went
* {@link ProviderEvent#PROVIDER_STALE} for {@link #gracePeriod} seconds.
*/
private final ScheduledExecutorService errorExecutor;

/**
* A scheduled task for emitting {@link ProviderEvent#PROVIDER_ERROR}.
*/
private ScheduledFuture<?> errorTask;

/**
* The grace period in milliseconds to wait after
* {@link ProviderEvent#PROVIDER_STALE} before emitting a
* {@link ProviderEvent#PROVIDER_ERROR}.
*/
private final long gracePeriod;
/**
* The deadline in milliseconds for GRPC operations.
*/
private final long deadline;

protected final void finalize() {
// DO NOT REMOVE, spotbugs: CT_CONSTRUCTOR_THROW
Expand All @@ -55,18 +81,34 @@ public FlagdProvider() {
public FlagdProvider(final FlagdOptions options) {
switch (options.getResolverType().asString()) {
case Config.RESOLVER_IN_PROCESS:
this.flagResolver = new InProcessResolver(options, this::isConnected, this::onConnectionEvent);
this.flagResolver = new InProcessResolver(options, this::onProviderEvent);
break;
case Config.RESOLVER_RPC:
this.flagResolver = new GrpcResolver(
options, new Cache(options.getCacheType(), options.getMaxCacheSize()), this::onConnectionEvent);
options, new Cache(options.getCacheType(), options.getMaxCacheSize()), this::onProviderEvent);
break;
default:
throw new IllegalStateException(
String.format("Requested unsupported resolver type of %s", options.getResolverType()));
}
hooks.add(new SyncMetadataHook(this::getEnrichedContext));
contextEnricher = options.getContextEnricher();
errorExecutor = Executors.newSingleThreadScheduledExecutor();
gracePeriod = options.getRetryGracePeriod();
deadline = options.getDeadline();
}

/**
* Internal constructor for test cases.
* DO NOT MAKE PUBLIC
*/
FlagdProvider(Resolver resolver, boolean initialized) {
this.flagResolver = resolver;
deadline = Config.DEFAULT_DEADLINE;
gracePeriod = Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD;
hooks.add(new SyncMetadataHook(this::getEnrichedContext));
errorExecutor = Executors.newSingleThreadScheduledExecutor();
this.eventsLock.initialized = initialized;
}

@Override
Expand All @@ -75,27 +117,39 @@ public List<Hook> getProviderHooks() {
}

@Override
public synchronized void initialize(EvaluationContext evaluationContext) throws Exception {
if (this.initialized) {
return;
}
public void initialize(EvaluationContext evaluationContext) throws Exception {
synchronized (eventsLock) {
if (eventsLock.initialized) {
return;
}

this.flagResolver.init();
this.initialized = this.connected = true;
flagResolver.init();
}
// block till ready - this works with deadline fine for rpc, but with in_process
// we also need to take parsing into the equation
// TODO: evaluate where we are losing time, so we can remove this magic number -
// follow up
// wait outside of the synchonrization or we'll deadlock
Util.busyWaitAndCheck(this.deadline * 2, () -> eventsLock.initialized);
}

@Override
public synchronized void shutdown() {
if (!this.initialized) {
return;
}

try {
this.flagResolver.shutdown();
} catch (Exception e) {
log.error("Error during shutdown {}", FLAGD_PROVIDER, e);
} finally {
this.initialized = false;
public void shutdown() {
synchronized (eventsLock) {
if (!eventsLock.initialized) {
return;
}
try {
this.flagResolver.shutdown();
if (errorExecutor != null) {
errorExecutor.shutdownNow();
errorExecutor.awaitTermination(deadline, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
log.error("Error during shutdown {}", FLAGD_PROVIDER, e);
} finally {
eventsLock.initialized = false;
}
}
}

Expand All @@ -106,27 +160,27 @@ public Metadata getMetadata() {

@Override
public ProviderEvaluation<Boolean> getBooleanEvaluation(String key, Boolean defaultValue, EvaluationContext ctx) {
return this.flagResolver.booleanEvaluation(key, defaultValue, ctx);
return flagResolver.booleanEvaluation(key, defaultValue, ctx);
}

@Override
public ProviderEvaluation<String> getStringEvaluation(String key, String defaultValue, EvaluationContext ctx) {
return this.flagResolver.stringEvaluation(key, defaultValue, ctx);
return flagResolver.stringEvaluation(key, defaultValue, ctx);
}

@Override
public ProviderEvaluation<Double> getDoubleEvaluation(String key, Double defaultValue, EvaluationContext ctx) {
return this.flagResolver.doubleEvaluation(key, defaultValue, ctx);
return flagResolver.doubleEvaluation(key, defaultValue, ctx);
}

@Override
public ProviderEvaluation<Integer> getIntegerEvaluation(String key, Integer defaultValue, EvaluationContext ctx) {
return this.flagResolver.integerEvaluation(key, defaultValue, ctx);
return flagResolver.integerEvaluation(key, defaultValue, ctx);
}

@Override
public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultValue, EvaluationContext ctx) {
return this.flagResolver.objectEvaluation(key, defaultValue, ctx);
return flagResolver.objectEvaluation(key, defaultValue, ctx);
}

/**
Expand All @@ -139,7 +193,7 @@ public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultVa
* @return Object map representing sync metadata
*/
protected Structure getSyncMetadata() {
return new ImmutableStructure(syncMetadata.asMap());
return new ImmutableStructure(eventsLock.syncMetadata.asMap());
}

/**
Expand All @@ -148,50 +202,109 @@ protected Structure getSyncMetadata() {
* @return context
*/
EvaluationContext getEnrichedContext() {
return enrichedContext;
return eventsLock.enrichedContext;
}

private boolean isConnected() {
return this.connected;
}
@SuppressWarnings("checkstyle:fallthrough")
private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {

private void onConnectionEvent(ConnectionEvent connectionEvent) {
final boolean wasConnected = connected;
final boolean isConnected = connected = connectionEvent.isConnected();
synchronized (eventsLock) {
log.info("FlagdProviderEvent: {}", flagdProviderEvent);
eventsLock.syncMetadata = flagdProviderEvent.getSyncMetadata();
if (flagdProviderEvent.getSyncMetadata() != null) {
eventsLock.enrichedContext = contextEnricher.apply(flagdProviderEvent.getSyncMetadata());
}

syncMetadata = connectionEvent.getSyncMetadata();
enrichedContext = contextEnricher.apply(connectionEvent.getSyncMetadata());
/*
* We only use Error and Ready as previous states.
* As error will first be emitted as Stale, and only turns after a while into an
* emitted Error.
* Ready is needed, as the InProcessResolver does not have a dedicated ready
* event, hence we need to
* forward a configuration changed to the ready, if we are not in the ready
* state.
*/
switch (flagdProviderEvent.getEvent()) {
case PROVIDER_CONFIGURATION_CHANGED:
if (eventsLock.previousEvent == ProviderEvent.PROVIDER_READY) {
onConfigurationChanged(flagdProviderEvent);
break;
}
// intentional fall through, a not-ready change will trigger a ready.
case PROVIDER_READY:
onReady();
eventsLock.previousEvent = ProviderEvent.PROVIDER_READY;
break;

if (!initialized) {
return;
case PROVIDER_ERROR:
if (eventsLock.previousEvent != ProviderEvent.PROVIDER_ERROR) {
onError();
}
eventsLock.previousEvent = ProviderEvent.PROVIDER_ERROR;
break;
default:
log.info("Unknown event {}", flagdProviderEvent.getEvent());
}
}
}

private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) {
this.emitProviderConfigurationChanged(ProviderEventDetails.builder()
.flagsChanged(flagdProviderEvent.getFlagsChanged())
.message("configuration changed")
.build());
}

if (!wasConnected && isConnected) {
ProviderEventDetails details = ProviderEventDetails.builder()
.flagsChanged(connectionEvent.getFlagsChanged())
.message("connected to flagd")
.build();
this.emitProviderReady(details);
return;
private void onReady() {
if (!eventsLock.initialized) {
eventsLock.initialized = true;
log.info("initialized FlagdProvider");
}
if (errorTask != null && !errorTask.isCancelled()) {
errorTask.cancel(false);
log.debug("Reconnection task cancelled as connection became READY.");
}
this.emitProviderReady(
ProviderEventDetails.builder().message("connected to flagd").build());
}

if (wasConnected && isConnected) {
ProviderEventDetails details = ProviderEventDetails.builder()
.flagsChanged(connectionEvent.getFlagsChanged())
.message("configuration changed")
.build();
this.emitProviderConfigurationChanged(details);
return;
private void onError() {
log.info("Connection lost. Emit STALE event...");
log.debug("Waiting {}s for connection to become available...", gracePeriod);
this.emitProviderStale(ProviderEventDetails.builder()
.message("there has been an error")
.build());

if (errorTask != null && !errorTask.isCancelled()) {
errorTask.cancel(false);
}

if (connectionEvent.isStale()) {
this.emitProviderStale(ProviderEventDetails.builder()
.message("there has been an error")
.build());
} else {
this.emitProviderError(ProviderEventDetails.builder()
.message("there has been an error")
.build());
if (!errorExecutor.isShutdown()) {
errorTask = errorExecutor.schedule(
() -> {
if (eventsLock.previousEvent == ProviderEvent.PROVIDER_ERROR) {
log.debug(
"Provider did not reconnect successfully within {}s. Emit ERROR event...",
gracePeriod);
flagResolver.onError();
this.emitProviderError(ProviderEventDetails.builder()
.message("there has been an error")
.build());
}
},
gracePeriod,
TimeUnit.SECONDS);
}
}

/**
* Contains all fields we need to worry about locking, used as intrinsic lock
* for sync blocks.
*/
static class EventsLock {
volatile ProviderEvent previousEvent = null;
volatile Structure syncMetadata = new ImmutableStructure();
volatile boolean initialized = false;
volatile EvaluationContext enrichedContext = new ImmutableContext();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public interface Resolver {

void shutdown() throws Exception;

default void onError() {}

ProviderEvaluation<Boolean> booleanEvaluation(String key, Boolean defaultValue, EvaluationContext ctx);

ProviderEvaluation<String> stringEvaluation(String key, String defaultValue, EvaluationContext ctx);
Expand Down
Loading
Loading