From 866809fe1d6c35e8e74b1c2df202b7310264560f Mon Sep 17 00:00:00 2001 From: Erdi Rowlands Date: Fri, 4 Oct 2024 15:05:22 +0100 Subject: [PATCH] FFM-12087 New configuration options + close related bug fixes (#203) --- README.md | 4 +- .../io/harness/ff/examples/ConfigExample.java | 3 + settings.gradle | 2 +- .../io/harness/cf/client/api/BaseConfig.java | 30 ++++++++ .../io/harness/cf/client/api/InnerClient.java | 15 +++- .../cf/client/api/MetricsProcessor.java | 28 ++++++-- .../cf/client/connector/Connector.java | 4 ++ .../cf/client/connector/EventSource.java | 12 +++- .../cf/client/connector/HarnessConfig.java | 69 +++++++++++++++++++ .../cf/client/connector/HarnessConnector.java | 56 +++++++++++++-- .../cf/client/connector/LocalConnector.java | 10 +++ .../client/connector/NewRetryInterceptor.java | 59 ++++++++++++---- .../api/MetricsProcessorStressTest.java | 3 +- .../cf/client/api/MetricsProcessorTest.java | 51 ++++++++++++-- .../client/api/testutils/DummyConnector.java | 8 +++ .../cf/client/connector/EventSourceTest.java | 25 ++++--- 16 files changed, 331 insertions(+), 48 deletions(-) diff --git a/README.md b/README.md index 4c6fcc51..8d094969 100644 --- a/README.md +++ b/README.md @@ -78,14 +78,14 @@ Add the following Maven dependency in your project's pom.xml file: io.harness ff-java-server-sdk - 1.7.0 + 1.8.0 ``` #### Gradle ``` -implementation 'io.harness:ff-java-server-sdk:1.7.0' +implementation 'io.harness:ff-java-server-sdk:1.8.0' ``` ### Code Sample diff --git a/examples/src/main/java/io/harness/ff/examples/ConfigExample.java b/examples/src/main/java/io/harness/ff/examples/ConfigExample.java index 9c4d7778..f3632a8f 100644 --- a/examples/src/main/java/io/harness/ff/examples/ConfigExample.java +++ b/examples/src/main/java/io/harness/ff/examples/ConfigExample.java @@ -34,6 +34,9 @@ public static void main(String... args) HarnessConfig.builder() .configUrl("http://localhost:3000/api/1.0") .eventUrl("http://localhost:3000/api/1.0") + .maxRequestRetry(20) + .flushAnalyticsOnClose(true) + .flushAnalyticsOnCloseTimeout(30000) .build()); client = new CfClient(hc); client.waitForInitialization(); diff --git a/settings.gradle b/settings.gradle index 80326964..04777dcf 100644 --- a/settings.gradle +++ b/settings.gradle @@ -4,7 +4,7 @@ dependencyResolutionManagement { versionCatalogs { libs { // main sdk version - version('sdk', '1.7.0'); + version('sdk', '1.8.0'); // sdk deps version('okhttp3', '4.12.0') diff --git a/src/main/java/io/harness/cf/client/api/BaseConfig.java b/src/main/java/io/harness/cf/client/api/BaseConfig.java index 7986a6ef..d64f0c5b 100644 --- a/src/main/java/io/harness/cf/client/api/BaseConfig.java +++ b/src/main/java/io/harness/cf/client/api/BaseConfig.java @@ -14,6 +14,7 @@ @Data public class BaseConfig { public static final int MIN_FREQUENCY = 60; + public static final long DEFAULT_REQUEST_RETRIES = 10; @Builder.Default private final boolean streamEnabled = true; @Builder.Default private final int pollIntervalInSeconds = 60; @@ -53,4 +54,33 @@ public int getFrequency() { @Builder.Default private final Cache cache = new CaffeineCache(10000); private final Storage store; + + /** + * Defines the maximum number of retry attempts for certain types of requests: + * authentication, polling, metrics, and reacting to stream events. If a request fails, + * the SDK will retry up to this number of times before giving up. + *

+ * - Authentication: Used for retrying authentication requests when the server is unreachable. + * - Polling: Applies to requests that fetch feature flags and target groups periodically. + * - Metrics: Applies to analytics requests for sending metrics data to the server. + * - Reacting to Stream Events: Applies to requests triggered by streamed flag or group changes, + * where the SDK needs to fetch updated flag or group data. + *

+ *

+ * The default value is {@code 10}. + *

+ * Note: This setting does not apply to streaming requests (either the initial connection or + * reconnecting after a disconnection). Streaming requests will always retry indefinitely + * (infinite retries). + *

+ * Example usage: + *

+   * {@code
+   * BaseConfig config = BaseConfig.builder()
+   *     .maxRequestRetry(20)
+   *     .build();
+   * }
+   * 
+ */ + @Builder.Default private final long maxRequestRetry = DEFAULT_REQUEST_RETRIES; } diff --git a/src/main/java/io/harness/cf/client/api/InnerClient.java b/src/main/java/io/harness/cf/client/api/InnerClient.java index bec1ed88..7ef4090d 100644 --- a/src/main/java/io/harness/cf/client/api/InnerClient.java +++ b/src/main/java/io/harness/cf/client/api/InnerClient.java @@ -87,7 +87,6 @@ protected void setUp(@NonNull final Connector connector, @NonNull final BaseConf log.info("Starting SDK client with configuration: {}", this.options); this.connector = connector; this.connector.setOnUnauthorized(this::onUnauthorized); - // initialization repository = new StorageRepository( @@ -96,7 +95,9 @@ protected void setUp(@NonNull final Connector connector, @NonNull final BaseConf authService = new AuthService(this.connector, options.getPollIntervalInSeconds(), this); pollProcessor = new PollingProcessor(this.connector, repository, options.getPollIntervalInSeconds(), this); - metricsProcessor = new MetricsProcessor(this.connector, this.options, this); + metricsProcessor = + new MetricsProcessor( + this.connector, this.options, this, connector.getShouldFlushAnalyticsOnClose()); updateProcessor = new UpdateProcessor(this.connector, this.repository, this); // start with authentication @@ -228,7 +229,9 @@ public void onDisconnected(String reason) { closing, options.getPollIntervalInSeconds()); log.debug("SSE disconnect detected - asking poller to refresh flags"); - pollProcessor.retrieveAll(); + if (!closing) { + pollProcessor.retrieveAll(); + } } } @@ -388,6 +391,12 @@ public void processEvaluation( public void close() { log.info("Closing the client"); closing = true; + + // Mark the connector as shutting down to stop request retries from taking place. The + // connections will eventually + // be evicted when the connector is closed, but this ensures that if metrics are flushed when + // closed then it won't attempt to retry if the first request fails. + connector.setIsShuttingDown(); off(); authService.close(); repository.close(); diff --git a/src/main/java/io/harness/cf/client/api/MetricsProcessor.java b/src/main/java/io/harness/cf/client/api/MetricsProcessor.java index e1e27590..2958dbb6 100644 --- a/src/main/java/io/harness/cf/client/api/MetricsProcessor.java +++ b/src/main/java/io/harness/cf/client/api/MetricsProcessor.java @@ -4,6 +4,7 @@ import static io.harness.cf.client.common.Utils.shutdownExecutorService; import static java.util.concurrent.TimeUnit.SECONDS; +import io.harness.cf.Version; import io.harness.cf.client.common.SdkCodes; import io.harness.cf.client.common.StringUtils; import io.harness.cf.client.connector.Connector; @@ -15,10 +16,7 @@ import io.harness.cf.model.TargetData; import io.harness.cf.model.Variation; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.*; import java.util.concurrent.atomic.LongAdder; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -113,13 +111,19 @@ public boolean containsKey(K key) { private final LongAdder metricsSent = new LongAdder(); private final int maxFreqMapSize; + private final boolean shouldFlushMetricsOnClose; + public MetricsProcessor( - @NonNull Connector connector, @NonNull BaseConfig config, @NonNull MetricsCallback callback) { + @NonNull Connector connector, + @NonNull BaseConfig config, + @NonNull MetricsCallback callback, + boolean shouldFlushMetricsOnClose) { this.connector = connector; this.config = config; this.frequencyMap = new FrequencyMap<>(); this.targetsSeen = ConcurrentHashMap.newKeySet(); this.maxFreqMapSize = clamp(config.getBufferSize(), 2048, MAX_FREQ_MAP_TO_RETAIN); + this.shouldFlushMetricsOnClose = shouldFlushMetricsOnClose; callback.onMetricsReady(); } @@ -218,7 +222,7 @@ protected Metrics prepareSummaryMetricsBody(Map data, Set log.warn("failed to stop metrics scheduler: {}", errMsg)); + errMsg -> { + if (shouldFlushMetricsOnClose) { + log.warn("Waited for flush to finish {}", errMsg); + } else { + log.warn("Failed to stop metrics scheduler: {}", errMsg); + } + }); log.debug("Closing MetricsProcessor"); } diff --git a/src/main/java/io/harness/cf/client/connector/Connector.java b/src/main/java/io/harness/cf/client/connector/Connector.java index 67444f6f..a2529aac 100644 --- a/src/main/java/io/harness/cf/client/connector/Connector.java +++ b/src/main/java/io/harness/cf/client/connector/Connector.java @@ -29,4 +29,8 @@ public interface Connector { Service stream(Updater updater) throws ConnectorException; void close(); + + boolean getShouldFlushAnalyticsOnClose(); + + void setIsShuttingDown(); } diff --git a/src/main/java/io/harness/cf/client/connector/EventSource.java b/src/main/java/io/harness/cf/client/connector/EventSource.java index 2520fdc9..4d985991 100644 --- a/src/main/java/io/harness/cf/client/connector/EventSource.java +++ b/src/main/java/io/harness/cf/client/connector/EventSource.java @@ -15,6 +15,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.*; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -37,6 +38,7 @@ public class EventSource implements Callback, AutoCloseable, Service { private final Map headers; private final long sseReadTimeoutMins; private final List trustedCAs; + private final AtomicBoolean isShuttingDown; static { LogUtil.setSystemProps(); @@ -48,7 +50,7 @@ public EventSource( @NonNull Updater updater, long sseReadTimeoutMins) throws ConnectorException { - this(url, headers, updater, sseReadTimeoutMins, 2_000, null); + this(url, headers, updater, sseReadTimeoutMins, 2_000, null, new AtomicBoolean(false)); } EventSource( @@ -57,7 +59,8 @@ public EventSource( @NonNull Updater updater, long sseReadTimeoutMins, int retryBackoffDelay, - List trustedCAs) { + List trustedCAs, + AtomicBoolean isShuttingDown) { this.url = url; this.headers = headers; this.updater = updater; @@ -65,6 +68,7 @@ public EventSource( this.retryBackoffDelay = retryBackoffDelay; this.trustedCAs = trustedCAs; this.loggingInterceptor = new HttpLoggingInterceptor(); + this.isShuttingDown = isShuttingDown; } protected OkHttpClient makeStreamClient(long sseReadTimeoutMins, List trustedCAs) @@ -83,7 +87,8 @@ protected OkHttpClient makeStreamClient(long sseReadTimeoutMins, List tlsTrustedCAs = null; + + /** + * Defines the maximum number of retry attempts for certain types of requests: + * authentication, polling, metrics, and reacting to stream events. If a request fails, + * the SDK will retry up to this number of times before giving up. + *

+ * - Authentication: Used for retrying authentication requests when the server is unreachable. + * - Polling: Applies to requests that fetch feature flags and target groups periodically. + * - Metrics: Applies to analytics requests for sending metrics data to the server. + * - Reacting to Stream Events: Applies to requests triggered by streamed flag or group changes, + * where the SDK needs to fetch updated flag or group data. + *

+ * Note: This setting does not apply to streaming requests (either the initial connection or + * reconnecting after a disconnection). Streaming requests will always retry indefinitely + * (infinite retries). + */ + @Builder.Default private long maxRequestRetry = 10; + + /** + * Indicates whether to flush analytics data when the SDK is closed. + *

+ * When set to {@code true}, any remaining analytics data (such as metrics) + * will be sent to the server before the SDK is fully closed. If {@code false}, + * the data will not be flushed, and any unsent analytics data may be lost. + *

+ * The default value is {@code false}. + *

+ * Note: The flush will attempt to send the data in a single request. + * Any failures during this process will not be retried, and the analytics data + * may be lost. + * + *

Example usage: + *

+   * {@code
+   * HarnessConfig harnessConfig = HarnessConfig.builder()
+   *     .flushAnalyticsOnClose(true)
+   *     .build();
+   * }
+   * 
+ */ + @Builder.Default private final boolean flushAnalyticsOnClose = false; + + /** + * The timeout for flushing analytics on SDK close. + *

+ * This option sets the maximum duration, in milliseconds, the SDK will wait for the + * analytics data to be flushed after the SDK has been closed. If the flush process takes longer + * than this timeout, the request will be canceled, and any remaining data will + * not be sent. This ensures that the SDK does not hang indefinitely during shutdown. + *

+ * The default value is {@code 30000ms} which is the default read timeout for requests made by the SDK + *

+ * Note: This timeout only applies to the flush process that happens when + * {@code flushAnalyticsOnClose} is set to {@code true}. It does not affect other + * requests made by the SDK during normal operation. + * + *

Example usage: + *

+   * {@code
+   *
+   * HarnessConfig harnessConfig = HarnessConfig.builder()
+   *     .flushAnalyticsOnClose(true)
+   *      // Timeout the analytics flush request in 3000ms (3 seconds)
+   *     .flushAnalyticsOnCloseTimeout(3000).build();
+   *     .build();
+   * }
+   * 
+ */ + @Builder.Default private final int flushAnalyticsOnCloseTimeout = 30000; } diff --git a/src/main/java/io/harness/cf/client/connector/HarnessConnector.java b/src/main/java/io/harness/cf/client/connector/HarnessConnector.java index 2486b837..e869b66f 100644 --- a/src/main/java/io/harness/cf/client/connector/HarnessConnector.java +++ b/src/main/java/io/harness/cf/client/connector/HarnessConnector.java @@ -15,6 +15,8 @@ import java.security.cert.X509Certificate; import java.util.*; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.NonNull; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -33,6 +35,8 @@ public class HarnessConnector implements Connector, AutoCloseable { private final String apiKey; private final HarnessConfig options; + private final AtomicBoolean isShuttingDown = new AtomicBoolean(false); + private String token; private String environmentUuid; private String cluster; @@ -89,13 +93,19 @@ ApiClient makeApiClient(int retryBackOfDelay) { .getHttpClient() .newBuilder() .addInterceptor(this::reauthInterceptor) - .addInterceptor(new NewRetryInterceptor(3, retryBackOfDelay)) + .addInterceptor( + new NewRetryInterceptor( + options.getMaxRequestRetry(), retryBackOfDelay, isShuttingDown)) .build()); return apiClient; } private Response reauthInterceptor(Interceptor.Chain chain) throws IOException { + if (isShuttingDown.get()) { + return null; + } + final Request request = chain.request().newBuilder().addHeader("X-Request-ID", getRequestID()).build(); log.debug("Checking for 403 in interceptor: requesting url {}", request.url().url()); @@ -127,18 +137,39 @@ ApiClient makeMetricsApiClient(int retryBackoffDelay) { .getHttpClient() .newBuilder() .addInterceptor(this::metricsInterceptor) - .addInterceptor(new NewRetryInterceptor(3, retryBackoffDelay)) + .addInterceptor( + new NewRetryInterceptor( + options.getMaxRequestRetry(), retryBackoffDelay, isShuttingDown)) .build()); return apiClient; } private Response metricsInterceptor(Interceptor.Chain chain) throws IOException { - final Request request = - chain.request().newBuilder().addHeader("X-Request-ID", getRequestID()).build(); - log.debug("metrics interceptor: requesting url {}", request.url().url()); - return chain.proceed(request); + Request originalRequest = chain.request(); + + // If this is flush when the SDK has been closed, then apply a per request timeout instead + // of the okhttp client timeout + if (isShuttingDown.get()) { + log.debug("SDK is shutting down, applying custom call timeout for flush request"); + + Request shutdownRequest = + originalRequest.newBuilder().addHeader("X-Request-ID", getRequestID()).build(); + + // Apply custom timeouts (e.g., 5 seconds for each timeout type) + return chain + .withConnectTimeout(options.getFlushAnalyticsOnCloseTimeout(), TimeUnit.MILLISECONDS) + .withReadTimeout(options.getFlushAnalyticsOnCloseTimeout(), TimeUnit.MILLISECONDS) + .withWriteTimeout(options.getFlushAnalyticsOnCloseTimeout(), TimeUnit.MILLISECONDS) + .proceed(shutdownRequest); + } else { + final Request request = + originalRequest.newBuilder().addHeader("X-Request-ID", getRequestID()).build(); + log.debug("metrics interceptor: requesting url {}", request.url().url()); + + return chain.proceed(request); + } } protected String getRequestID() { @@ -405,13 +436,15 @@ public Service stream(@NonNull final Updater updater) throws ConnectorException updater, Math.max(options.getSseReadTimeout(), 1), ThreadLocalRandom.current().nextInt(5000, 10000), - options.getTlsTrustedCAs()); + options.getTlsTrustedCAs(), + isShuttingDown); return eventSource; } @Override public void close() { log.debug("closing connector"); + isShuttingDown.set(true); api.getApiClient().getHttpClient().connectionPool().evictAll(); log.debug("All apiClient connections evicted"); metricsApi.getApiClient().getHttpClient().connectionPool().evictAll(); @@ -437,6 +470,15 @@ private void setupTls(ApiClient apiClient) { } } + public void setIsShuttingDown() { + this.isShuttingDown.set(true); + } + + @Override + public boolean getShouldFlushAnalyticsOnClose() { + return options.isFlushAnalyticsOnClose(); + } + private static boolean isNullOrEmpty(String string) { return string == null || string.trim().isEmpty(); } diff --git a/src/main/java/io/harness/cf/client/connector/LocalConnector.java b/src/main/java/io/harness/cf/client/connector/LocalConnector.java index 6dd0aa2d..5ce77297 100644 --- a/src/main/java/io/harness/cf/client/connector/LocalConnector.java +++ b/src/main/java/io/harness/cf/client/connector/LocalConnector.java @@ -195,6 +195,16 @@ public void close() { log.debug("LocalConnector closed"); } + @Override + public boolean getShouldFlushAnalyticsOnClose() { + return false; + } + + @Override + public void setIsShuttingDown() { + // No need for local connector as no retries used + } + private class FileWatcherService implements Service, AutoCloseable { private final FileWatcher flagWatcher; private final FileWatcher segmentWatcher; diff --git a/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java b/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java index 0bafbb54..7df56229 100644 --- a/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java +++ b/src/main/java/io/harness/cf/client/connector/NewRetryInterceptor.java @@ -1,5 +1,7 @@ package io.harness.cf.client.connector; +import static io.harness.cf.client.api.BaseConfig.DEFAULT_REQUEST_RETRIES; + import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -8,6 +10,7 @@ import java.util.Date; import java.util.Locale; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import okhttp3.*; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; @@ -19,16 +22,33 @@ public class NewRetryInterceptor implements Interceptor { private static final SimpleDateFormat imfDateFormat = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss zzz", Locale.US); private final long retryBackoffDelay; - private final long maxTryCount; + private final boolean retryForever; + + private final AtomicBoolean isShuttingDown; + + // Use SDK default is not specified + private long maxTryCount = DEFAULT_REQUEST_RETRIES; - public NewRetryInterceptor(long retryBackoffDelay) { + public NewRetryInterceptor(long retryBackoffDelay, AtomicBoolean isShuttingDown) { this.retryBackoffDelay = retryBackoffDelay; - this.maxTryCount = 5; + this.retryForever = false; + this.isShuttingDown = isShuttingDown; } - public NewRetryInterceptor(long maxTryCount, long retryBackoffDelay) { + public NewRetryInterceptor( + long maxTryCount, long retryBackoffDelay, AtomicBoolean isShuttingDown) { this.retryBackoffDelay = retryBackoffDelay; this.maxTryCount = maxTryCount; + this.retryForever = false; + this.isShuttingDown = isShuttingDown; + } + + // New constructor with retryForever flag + public NewRetryInterceptor( + long retryBackoffDelay, boolean retryForever, AtomicBoolean isShuttingDown) { + this.retryBackoffDelay = retryBackoffDelay; + this.retryForever = retryForever; + this.isShuttingDown = isShuttingDown; } @NotNull @@ -36,7 +56,7 @@ public NewRetryInterceptor(long maxTryCount, long retryBackoffDelay) { public Response intercept(@NotNull Chain chain) throws IOException { int tryCount = 1; boolean successful; - boolean limitReached = false; + boolean limitReached; Response response = null; String msg = ""; do { @@ -66,29 +86,44 @@ public Response intercept(@NotNull Chain chain) throws IOException { return response; } } + if (!successful) { int retryAfterHeaderValue = getRetryAfterHeaderInSeconds(response); long backOffDelayMs; + if (retryAfterHeaderValue > 0) { // Use Retry-After header if detected first log.trace("Retry-After header detected: {} seconds", retryAfterHeaderValue); backOffDelayMs = retryAfterHeaderValue * 1000L; } else { - // Else fallback to a randomized exponential backoff - backOffDelayMs = retryBackoffDelay * tryCount; + // Else fallback to a randomized exponential backoff with a max delay of 1 minute + // (60,000ms) + backOffDelayMs = Math.min(retryBackoffDelay * tryCount, 60000L); + } + + String retryLimitDisplay = retryForever ? "∞" : String.valueOf(maxTryCount); + limitReached = !retryForever && tryCount >= maxTryCount; + + if (isShuttingDown.get()) { + log.warn( + "Request attempt {} to {} was not successful, [{}], SDK is shutting down, no retries will be attempted", + tryCount, + chain.request().url(), + msg); + return response; // Exit without further retries } - limitReached = tryCount >= maxTryCount; log.warn( - "Request attempt {} to {} was not successful, [{}]{}", + "Request attempt {} of {} to {} was not successful, [{}]{}", tryCount, + retryLimitDisplay, chain.request().url(), msg, limitReached - ? ", retry limited reached" + ? ", retry limit reached" : String.format( Locale.getDefault(), - ", retrying in %dms (retry-after hdr: %b)", + ", retrying in %dms (retry-after hdr: %b)", backOffDelayMs, retryAfterHeaderValue > 0)); @@ -97,7 +132,7 @@ public Response intercept(@NotNull Chain chain) throws IOException { } } tryCount++; - } while (!successful && !limitReached); + } while (!successful && (retryForever || tryCount <= maxTryCount) && !isShuttingDown.get()); return response; } diff --git a/src/test/java/io/harness/cf/client/api/MetricsProcessorStressTest.java b/src/test/java/io/harness/cf/client/api/MetricsProcessorStressTest.java index abb1676d..e2b33e94 100644 --- a/src/test/java/io/harness/cf/client/api/MetricsProcessorStressTest.java +++ b/src/test/java/io/harness/cf/client/api/MetricsProcessorStressTest.java @@ -50,7 +50,8 @@ void testRegisterEvaluationContention() throws Exception { BaseConfig.builder() // .globalTargetEnabled(false) .build(), - new DummyMetricsCallback()); + new DummyMetricsCallback(), + false); metricsProcessor.start(); diff --git a/src/test/java/io/harness/cf/client/api/MetricsProcessorTest.java b/src/test/java/io/harness/cf/client/api/MetricsProcessorTest.java index 5e6b3efd..cda8b26a 100644 --- a/src/test/java/io/harness/cf/client/api/MetricsProcessorTest.java +++ b/src/test/java/io/harness/cf/client/api/MetricsProcessorTest.java @@ -1,8 +1,7 @@ package io.harness.cf.client.api; import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import io.harness.cf.client.connector.Connector; import io.harness.cf.client.connector.ConnectorException; @@ -34,11 +33,55 @@ public void setup() { MockitoAnnotations.openMocks(this); metricsProcessor = Mockito.spy( - new MetricsProcessor(connector, BaseConfig.builder().bufferSize(10_001).build(), this)); + new MetricsProcessor( + connector, BaseConfig.builder().bufferSize(10_001).build(), this, false)); metricsProcessor.reset(); } + @Test + public void testFlushAnalyticsOnCloseDisabled() throws ConnectorException, InterruptedException { + // Arrange + Metrics mockMetrics = mock(Metrics.class); + doNothing().when(connector).postMetrics(mockMetrics); + + // Act: Push some metrics data and call flush + Target target = Target.builder().identifier("target-1").build(); + Variation variation = Variation.builder().identifier("true").value("true").build(); + metricsProcessor.pushToQueue(target, "feature-1", variation); + + // Mimic shutdown behavior + metricsProcessor.close(); + + // Assert: Verify that postMetrics not called during shutdown + verify(connector, times(0)).postMetrics(any(Metrics.class)); + verifyNoMoreInteractions(connector); + } + + @Test + public void testFlushAnalyticsOnCloseEnabled() throws ConnectorException, InterruptedException { + // Arrange + metricsProcessor = + Mockito.spy( + new MetricsProcessor( + connector, BaseConfig.builder().bufferSize(10_001).build(), this, true)); + + Metrics mockMetrics = mock(Metrics.class); + doNothing().when(connector).postMetrics(mockMetrics); + + // Act: Push some metrics data and call flush + Target target = Target.builder().identifier("target-1").build(); + Variation variation = Variation.builder().identifier("true").value("true").build(); + metricsProcessor.pushToQueue(target, "feature-1", variation); + + // Mimic shutdown behavior + metricsProcessor.close(); + + // Assert: Verify that postMetrics is called during shutdown + verify(connector, times(1)).postMetrics(any(Metrics.class)); + metricsProcessor.reset(); + } + @Test public void testPushToQueue() throws InterruptedException { ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(BUFFER_SIZE); @@ -183,7 +226,7 @@ void shouldPostCorrectMetrics_WhenGlobalTargetEnabledOrDisabled(boolean globalTa doNothing().when(mockConnector).postMetrics(metricsArgumentCaptor.capture()); final MetricsProcessor processor = - new MetricsProcessor(mockConnector, mockConfig, Mockito.mock(MetricsCallback.class)); + new MetricsProcessor(mockConnector, mockConfig, Mockito.mock(MetricsCallback.class), false); final Target target = Target.builder().identifier("target123").build(); final Variation variation = Variation.builder().identifier("true").value("true").build(); diff --git a/src/test/java/io/harness/cf/client/api/testutils/DummyConnector.java b/src/test/java/io/harness/cf/client/api/testutils/DummyConnector.java index 69101a7e..df8086e7 100644 --- a/src/test/java/io/harness/cf/client/api/testutils/DummyConnector.java +++ b/src/test/java/io/harness/cf/client/api/testutils/DummyConnector.java @@ -79,4 +79,12 @@ public int getTotalMetricEvaluations() { @Override public void close() {} + + @Override + public boolean getShouldFlushAnalyticsOnClose() { + return false; + } + + @Override + public void setIsShuttingDown() {} } diff --git a/src/test/java/io/harness/cf/client/connector/EventSourceTest.java b/src/test/java/io/harness/cf/client/connector/EventSourceTest.java index 9cf33132..8d0e5bd1 100644 --- a/src/test/java/io/harness/cf/client/connector/EventSourceTest.java +++ b/src/test/java/io/harness/cf/client/connector/EventSourceTest.java @@ -9,6 +9,7 @@ import java.util.HashMap; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -27,8 +28,11 @@ static class StreamDispatcher extends Dispatcher { protected MockResponse makeStreamResponse() { int reqNo = request.getAndIncrement(); - if (reqNo <= 3) { - // Force a disconnect on the first few attempts + + if (reqNo <= 12) { + // Force a disconnect after the default SDK request retry limit of 10, which does not apply + // to stream requests which have + // no limit on retryable errors out.printf("ReqNo %d will be disconnected on purpose\n", reqNo); return new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AFTER_REQUEST); } else { @@ -61,7 +65,10 @@ protected MockResponse makeStreamResponse() { int reqNo = request.getAndIncrement(); // Force a disconnect on all requests out.printf("ReqNo %d will be disconnected on purpose\n", reqNo); - return new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AFTER_REQUEST); + // Set a 400 response so that the stream does not retry. This is because since 1.8.0 the + // stream + // retries forever on retryable errors. + return new MockResponse().setResponseCode(400).setBody("{\"status\":\"failed\"}"); } } @@ -78,7 +85,8 @@ void shouldNotCallErrorHandlerIfRetryEventuallyReconnectsToStreamEndpoint() updater, 1, 1, - null)) { + null, + new AtomicBoolean(false))) { eventSource.start(); TimeUnit.SECONDS.sleep(15); @@ -104,14 +112,15 @@ void shouldRestartPollerIfAllConnectionAttemptsToStreamEndpointFail() updater, 1, 1, - null)) { + null, + new AtomicBoolean(false))) { eventSource.start(); - TimeUnit.SECONDS.sleep(15); + TimeUnit.SECONDS.sleep(3); } - // for this test, connection to the /stream endpoint will never succeed. - // we expect the disconnect handler to be called, connect handler should not be called + // for this test, connection to the /stream endpoint will never because of an un-retryable + // error. We expect the disconnect handler to be called, connect handler should not be called assertEquals(0, updater.getConnectCount().get()); assertEquals(0, updater.getFailureCount().get());