Skip to content

Commit

Permalink
FFM-12087 New configuration options + close related bug fixes (#203)
Browse files Browse the repository at this point in the history
  • Loading branch information
erdirowlands authored Oct 4, 2024
1 parent c3429e1 commit 866809f
Show file tree
Hide file tree
Showing 16 changed files with 331 additions and 48 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ Add the following Maven dependency in your project's pom.xml file:
<dependency>
<groupId>io.harness</groupId>
<artifactId>ff-java-server-sdk</artifactId>
<version>1.7.0</version>
<version>1.8.0</version>
</dependency>
```

#### Gradle

```
implementation 'io.harness:ff-java-server-sdk:1.7.0'
implementation 'io.harness:ff-java-server-sdk:1.8.0'
```

### Code Sample
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public static void main(String... args)
HarnessConfig.builder()
.configUrl("http://localhost:3000/api/1.0")
.eventUrl("http://localhost:3000/api/1.0")
.maxRequestRetry(20)
.flushAnalyticsOnClose(true)
.flushAnalyticsOnCloseTimeout(30000)
.build());
client = new CfClient(hc);
client.waitForInitialization();
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ dependencyResolutionManagement {
versionCatalogs {
libs {
// main sdk version
version('sdk', '1.7.0');
version('sdk', '1.8.0');

// sdk deps
version('okhttp3', '4.12.0')
Expand Down
30 changes: 30 additions & 0 deletions src/main/java/io/harness/cf/client/api/BaseConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
@Data
public class BaseConfig {
public static final int MIN_FREQUENCY = 60;
public static final long DEFAULT_REQUEST_RETRIES = 10;

@Builder.Default private final boolean streamEnabled = true;
@Builder.Default private final int pollIntervalInSeconds = 60;
Expand Down Expand Up @@ -53,4 +54,33 @@ public int getFrequency() {
@Builder.Default private final Cache cache = new CaffeineCache(10000);

private final Storage store;

/**
* Defines the maximum number of retry attempts for certain types of requests:
* authentication, polling, metrics, and reacting to stream events. If a request fails,
* the SDK will retry up to this number of times before giving up.
* <p>
* - Authentication: Used for retrying authentication requests when the server is unreachable.
* - Polling: Applies to requests that fetch feature flags and target groups periodically.
* - Metrics: Applies to analytics requests for sending metrics data to the server.
* - Reacting to Stream Events: Applies to requests triggered by streamed flag or group changes,
* where the SDK needs to fetch updated flag or group data.
* <p>
* <p>
* The default value is {@code 10}.
* <p>
* <b>Note:</b> This setting does not apply to streaming requests (either the initial connection or
* reconnecting after a disconnection). Streaming requests will always retry indefinitely
* (infinite retries).
* <p>
* Example usage:
* <pre>
* {@code
* BaseConfig config = BaseConfig.builder()
* .maxRequestRetry(20)
* .build();
* }
* </pre>
*/
@Builder.Default private final long maxRequestRetry = DEFAULT_REQUEST_RETRIES;
}
15 changes: 12 additions & 3 deletions src/main/java/io/harness/cf/client/api/InnerClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ protected void setUp(@NonNull final Connector connector, @NonNull final BaseConf
log.info("Starting SDK client with configuration: {}", this.options);
this.connector = connector;
this.connector.setOnUnauthorized(this::onUnauthorized);

// initialization
repository =
new StorageRepository(
Expand All @@ -96,7 +95,9 @@ protected void setUp(@NonNull final Connector connector, @NonNull final BaseConf
authService = new AuthService(this.connector, options.getPollIntervalInSeconds(), this);
pollProcessor =
new PollingProcessor(this.connector, repository, options.getPollIntervalInSeconds(), this);
metricsProcessor = new MetricsProcessor(this.connector, this.options, this);
metricsProcessor =
new MetricsProcessor(
this.connector, this.options, this, connector.getShouldFlushAnalyticsOnClose());
updateProcessor = new UpdateProcessor(this.connector, this.repository, this);

// start with authentication
Expand Down Expand Up @@ -228,7 +229,9 @@ public void onDisconnected(String reason) {
closing,
options.getPollIntervalInSeconds());
log.debug("SSE disconnect detected - asking poller to refresh flags");
pollProcessor.retrieveAll();
if (!closing) {
pollProcessor.retrieveAll();
}
}
}

Expand Down Expand Up @@ -388,6 +391,12 @@ public void processEvaluation(
public void close() {
log.info("Closing the client");
closing = true;

// Mark the connector as shutting down to stop request retries from taking place. The
// connections will eventually
// be evicted when the connector is closed, but this ensures that if metrics are flushed when
// closed then it won't attempt to retry if the first request fails.
connector.setIsShuttingDown();
off();
authService.close();
repository.close();
Expand Down
28 changes: 21 additions & 7 deletions src/main/java/io/harness/cf/client/api/MetricsProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static io.harness.cf.client.common.Utils.shutdownExecutorService;
import static java.util.concurrent.TimeUnit.SECONDS;

import io.harness.cf.Version;
import io.harness.cf.client.common.SdkCodes;
import io.harness.cf.client.common.StringUtils;
import io.harness.cf.client.connector.Connector;
Expand All @@ -15,10 +16,7 @@
import io.harness.cf.model.TargetData;
import io.harness.cf.model.Variation;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -113,13 +111,19 @@ public boolean containsKey(K key) {
private final LongAdder metricsSent = new LongAdder();
private final int maxFreqMapSize;

private final boolean shouldFlushMetricsOnClose;

public MetricsProcessor(
@NonNull Connector connector, @NonNull BaseConfig config, @NonNull MetricsCallback callback) {
@NonNull Connector connector,
@NonNull BaseConfig config,
@NonNull MetricsCallback callback,
boolean shouldFlushMetricsOnClose) {
this.connector = connector;
this.config = config;
this.frequencyMap = new FrequencyMap<>();
this.targetsSeen = ConcurrentHashMap.newKeySet();
this.maxFreqMapSize = clamp(config.getBufferSize(), 2048, MAX_FREQ_MAP_TO_RETAIN);
this.shouldFlushMetricsOnClose = shouldFlushMetricsOnClose;
callback.onMetricsReady();
}

Expand Down Expand Up @@ -218,7 +222,7 @@ protected Metrics prepareSummaryMetricsBody(Map<MetricEvent, Long> data, Set<Tar
new KeyValue(TARGET_ATTRIBUTE, summary.getTargetIdentifier()),
new KeyValue(SDK_TYPE, SERVER),
new KeyValue(SDK_LANGUAGE, "java"),
new KeyValue(SDK_VERSION, io.harness.cf.Version.VERSION)));
new KeyValue(SDK_VERSION, Version.VERSION)));
if (metrics.getMetricsData() != null) {
metrics.getMetricsData().add(metricsData);
}
Expand Down Expand Up @@ -305,6 +309,10 @@ public void start() {
}

public void stop() {
if (shouldFlushMetricsOnClose && config.isAnalyticsEnabled()) {
flushQueue();
}

log.debug("Stopping MetricsProcessor");
if (scheduler.isShutdown()) {
return;
Expand All @@ -324,7 +332,13 @@ public void close() {
shutdownExecutorService(
scheduler,
SdkCodes::infoMetricsThreadExited,
errMsg -> log.warn("failed to stop metrics scheduler: {}", errMsg));
errMsg -> {
if (shouldFlushMetricsOnClose) {
log.warn("Waited for flush to finish {}", errMsg);
} else {
log.warn("Failed to stop metrics scheduler: {}", errMsg);
}
});

log.debug("Closing MetricsProcessor");
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/harness/cf/client/connector/Connector.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,8 @@ public interface Connector {
Service stream(Updater updater) throws ConnectorException;

void close();

boolean getShouldFlushAnalyticsOnClose();

void setIsShuttingDown();
}
12 changes: 9 additions & 3 deletions src/main/java/io/harness/cf/client/connector/EventSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.*;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -37,6 +38,7 @@ public class EventSource implements Callback, AutoCloseable, Service {
private final Map<String, String> headers;
private final long sseReadTimeoutMins;
private final List<X509Certificate> trustedCAs;
private final AtomicBoolean isShuttingDown;

static {
LogUtil.setSystemProps();
Expand All @@ -48,7 +50,7 @@ public EventSource(
@NonNull Updater updater,
long sseReadTimeoutMins)
throws ConnectorException {
this(url, headers, updater, sseReadTimeoutMins, 2_000, null);
this(url, headers, updater, sseReadTimeoutMins, 2_000, null, new AtomicBoolean(false));
}

EventSource(
Expand All @@ -57,14 +59,16 @@ public EventSource(
@NonNull Updater updater,
long sseReadTimeoutMins,
int retryBackoffDelay,
List<X509Certificate> trustedCAs) {
List<X509Certificate> trustedCAs,
AtomicBoolean isShuttingDown) {
this.url = url;
this.headers = headers;
this.updater = updater;
this.sseReadTimeoutMins = sseReadTimeoutMins;
this.retryBackoffDelay = retryBackoffDelay;
this.trustedCAs = trustedCAs;
this.loggingInterceptor = new HttpLoggingInterceptor();
this.isShuttingDown = isShuttingDown;
}

protected OkHttpClient makeStreamClient(long sseReadTimeoutMins, List<X509Certificate> trustedCAs)
Expand All @@ -83,7 +87,8 @@ protected OkHttpClient makeStreamClient(long sseReadTimeoutMins, List<X509Certif
httpClientBuilder.interceptors().remove(loggingInterceptor);
}

httpClientBuilder.addInterceptor(new NewRetryInterceptor(retryBackoffDelay));
httpClientBuilder.addInterceptor(
new NewRetryInterceptor(retryBackoffDelay, true, isShuttingDown));
return httpClientBuilder.build();
}

Expand Down Expand Up @@ -149,6 +154,7 @@ public void stop() {
public void close() {
stop();
if (this.streamClient != null) {
this.streamClient.dispatcher().executorService().shutdown();
this.streamClient.connectionPool().evictAll();
}
log.debug("EventSource closed");
Expand Down
69 changes: 69 additions & 0 deletions src/main/java/io/harness/cf/client/connector/HarnessConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,73 @@ public class HarnessConfig {
* should include intermediate CAs too to allow the HTTP client to build a full trust chain.
*/
@Builder.Default List<X509Certificate> tlsTrustedCAs = null;

/**
* Defines the maximum number of retry attempts for certain types of requests:
* authentication, polling, metrics, and reacting to stream events. If a request fails,
* the SDK will retry up to this number of times before giving up.
* <p>
* - Authentication: Used for retrying authentication requests when the server is unreachable.
* - Polling: Applies to requests that fetch feature flags and target groups periodically.
* - Metrics: Applies to analytics requests for sending metrics data to the server.
* - Reacting to Stream Events: Applies to requests triggered by streamed flag or group changes,
* where the SDK needs to fetch updated flag or group data.
* <p>
* Note: This setting does not apply to streaming requests (either the initial connection or
* reconnecting after a disconnection). Streaming requests will always retry indefinitely
* (infinite retries).
*/
@Builder.Default private long maxRequestRetry = 10;

/**
* Indicates whether to flush analytics data when the SDK is closed.
* <p>
* When set to {@code true}, any remaining analytics data (such as metrics)
* will be sent to the server before the SDK is fully closed. If {@code false},
* the data will not be flushed, and any unsent analytics data may be lost.
* <p>
* The default value is {@code false}.
* <p>
* <b>Note:</b> The flush will attempt to send the data in a single request.
* Any failures during this process will not be retried, and the analytics data
* may be lost.
*
* <p>Example usage:
* <pre>
* {@code
* HarnessConfig harnessConfig = HarnessConfig.builder()
* .flushAnalyticsOnClose(true)
* .build();
* }
* </pre>
*/
@Builder.Default private final boolean flushAnalyticsOnClose = false;

/**
* The timeout for flushing analytics on SDK close.
* <p>
* This option sets the maximum duration, in milliseconds, the SDK will wait for the
* analytics data to be flushed after the SDK has been closed. If the flush process takes longer
* than this timeout, the request will be canceled, and any remaining data will
* not be sent. This ensures that the SDK does not hang indefinitely during shutdown.
* <p>
* The default value is {@code 30000ms} which is the default read timeout for requests made by the SDK
* <p>
* <b>Note:</b> This timeout only applies to the flush process that happens when
* {@code flushAnalyticsOnClose} is set to {@code true}. It does not affect other
* requests made by the SDK during normal operation.
*
* <p>Example usage:
* <pre>
* {@code
*
* HarnessConfig harnessConfig = HarnessConfig.builder()
* .flushAnalyticsOnClose(true)
* // Timeout the analytics flush request in 3000ms (3 seconds)
* .flushAnalyticsOnCloseTimeout(3000).build();
* .build();
* }
* </pre>
*/
@Builder.Default private final int flushAnalyticsOnCloseTimeout = 30000;
}
Loading

0 comments on commit 866809f

Please sign in to comment.