Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

### Bug Fixes
- Make frequency optional; fix STOPPED state; add ecommerce tests ([#1565](https://github.com/opensearch-project/anomaly-detection/pull/1565))
- Fix flaky ITs ([#1571](https://github.com/opensearch-project/anomaly-detection/pull/1571))


### Infrastructure
Expand Down
87 changes: 50 additions & 37 deletions src/test/java/org/opensearch/ad/e2e/RealTimeFrequencySmokeIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

package org.opensearch.ad.e2e;

import java.io.IOException;
import java.time.Instant;
import java.time.format.DateTimeParseException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.Logger;
Expand Down Expand Up @@ -51,6 +53,10 @@ public class RealTimeFrequencySmokeIT extends AbstractADSyntheticDataTest {
public void testSimpleRealTimeBatchWithStats() throws Exception {
RestClient client = client();

LOG.info("Capturing baseline AD execute request count...");
int initialExecuteCount = getLocalAdExecuteRequestCount(client);
LOG.info("Initial AD execute request count: {}", initialExecuteCount);

// 1) Generate and ingest synthetic multi-entity data at 1-minute interval.
List<JsonObject> data = genUniformSingleFeatureData(
INTERVAL_MINUTES,
Expand Down Expand Up @@ -133,8 +139,37 @@ public void testSimpleRealTimeBatchWithStats() throws Exception {
LOG.info("Results validation passed: {} results (expected range: {}-{})", rtTotal, minExpected, maxExpected);
}

// 7) Call stats API and check ad_execute_request_count
LOG.info("Checking stats API for execution request count...");
// 7) Call stats API and check ad_execute_request_count delta
int expectedExecutions = 2;
LOG.info("Checking stats API for execution request count delta (expecting >= {})...", expectedExecutions);

// uses assertBusy to poll _local/stats until the cluster-level ad_execute_request_count has grown by at least the
// expected two executions—tolerating other ITs running in parallel while still guaranteeing we observe our detector’s increments.
assertBusy(() -> {
int currentExecuteCount = getLocalAdExecuteRequestCount(client);
int executeCountDelta = currentExecuteCount - initialExecuteCount;
LOG.info("Current AD execute request count: {}, delta since baseline: {}", currentExecuteCount, executeCountDelta);
assertTrue(
"Expected at least " + expectedExecutions + " ad_execute_request_count increase, but delta was " + executeCountDelta,
executeCountDelta >= expectedExecutions
);
}, 2, TimeUnit.MINUTES);
}

/**
* Parses a timestamp string into an Instant object.
* Supports epoch milliseconds format.
*
* @param timestampStr the timestamp string to parse
* @return the parsed Instant
* @throws DateTimeParseException if the timestamp format is not recognized
*/
private Instant parseMilliseconds(String timestampStr) {
return Instant.ofEpochMilli(Long.parseLong(timestampStr));
}

@SuppressWarnings("unchecked")
private int getLocalAdExecuteRequestCount(RestClient client) throws IOException {
Response statsResponse = TestHelpers
.makeRequest(
client,
Expand All @@ -150,45 +185,23 @@ public void testSimpleRealTimeBatchWithStats() throws Exception {
Map<String, Object> statsMap = entityAsMap(statsResponse);
LOG.info("Stats response: {}", statsMap);

// Parse the ad_execute_request_count from the nested structure
Map<String, Object> nodes = (Map<String, Object>) statsMap.get("nodes");
if (nodes != null && !nodes.isEmpty()) {
// Get the first node's stats
Map.Entry<String, Object> firstNode = nodes.entrySet().iterator().next();
@SuppressWarnings("unchecked")
Map<String, Object> nodeStats = (Map<String, Object>) firstNode.getValue();

Number adExecuteRequestCount = (Number) nodeStats.get("ad_execute_request_count");
if (adExecuteRequestCount != null) {
int executeCount = adExecuteRequestCount.intValue();
LOG.info("AD execute request count: {}", executeCount);

// Check that execute count is 2 (twice in 4 minutes)
if (executeCount != 2) {
LOG.error("AD execute request count {} is not 2", executeCount);
fail("Expected ad_execute_request_count to be 2, but got " + executeCount);
} else {
LOG.info("Stats validation passed: ad_execute_request_count is {}", executeCount);
}
} else {
LOG.error("ad_execute_request_count not found in node stats");
fail("ad_execute_request_count not found in stats response");
}
} else {
if (nodes == null || nodes.isEmpty()) {
LOG.error("No nodes found in stats response");
fail("No nodes found in stats response");
}
}

/**
* Parses a timestamp string into an Instant object.
* Supports epoch milliseconds format.
*
* @param timestampStr the timestamp string to parse
* @return the parsed Instant
* @throws DateTimeParseException if the timestamp format is not recognized
*/
private Instant parseMilliseconds(String timestampStr) {
return Instant.ofEpochMilli(Long.parseLong(timestampStr));
int totalCount = 0;
for (Object nodeValue : nodes.values()) {
Map<String, Object> nodeStats = (Map<String, Object>) nodeValue;
Number nodeCount = (Number) nodeStats.get("ad_execute_request_count");
if (nodeCount == null) {
LOG.error("ad_execute_request_count not found in node stats: {}", nodeStats);
fail("ad_execute_request_count not found in stats response");
}
totalCount += nodeCount.intValue();
}

return totalCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,20 @@ public void testUpdateAnomalyDetector() throws Exception {
assertEquals("Update anomaly detector failed", RestStatus.OK, TestHelpers.restStatus(updateResponse));
Map<String, Object> responseBody = entityAsMap(updateResponse);
assertEquals("Updated anomaly detector id doesn't match", detector.getId(), responseBody.get("_id"));
assertEquals("Version not incremented", (detector.getVersion().intValue() + 1), (int) responseBody.get("_version"));

if (isResourceSharingFeatureEnabled()) {
// The extra write is coming from the Security plugin’s resource‑sharing handler ResourceSharingIndexHandler.
// It happen on every index/update when the resource-sharing feature flag is enabled.
// ResourceSharingIndexHandler updates the detector document in .opendistro-anomaly-detectors to add the
// all_shared_principals field.
// The Security plugin registers a post-index listener (ResourceIndexListener) which triggers an UpdateRequest to set
// `all_shared_principals` after each detector document index/update.
// ResourceIndexListener (postIndex listener): https://tinyurl.com/4mk73vzm
// ResourceSharingIndexHandler (issues UpdateRequest for `all_shared_principals`): https://tinyurl.com/yujaez4d
assertTrue("Version not incremented", (detector.getVersion().intValue() + 1) <= (int) responseBody.get("_version"));
} else {
assertEquals("Version not incremented", (detector.getVersion().intValue() + 1), (int) responseBody.get("_version"));
}

AnomalyDetector updatedDetector = getConfig(detector.getId(), client());
assertNotEquals("Anomaly detector last update time not changed", updatedDetector.getLastUpdateTime(), detector.getLastUpdateTime());
Expand Down
Loading