Skip to content

Commit 3c66c83

Browse files
authored
Stabilize RealTimeFrequencySmokeIT; adjust version assert with resource (#1571)
sharing - RealTimeFrequencySmokeIT: capture baseline ad_execute_request_count and use assertBusy to wait for a delta ≥ 2 via _local/stats; aggregate counts across nodes to tolerate parallel ITs on a shared cluster (ran via ./gradlew ':integTestRemote'). Added getLocalAdExecuteRequestCount helper and logging to aid diagnosis. - AnomalyDetectorRestApiIT: when resource sharing is enabled, allow _version ≥ previous+1 since Security’s ResourceSharingIndexHandler triggers an extra update to set all_shared_principals. These changes reduce IT flakiness without altering production behavior. Signed-off-by: kaituo <kaituo@amazon.com>
1 parent 4a34838 commit 3c66c83

File tree

3 files changed

+65
-38
lines changed

3 files changed

+65
-38
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
1212

1313
### Bug Fixes
1414
- Make frequency optional; fix STOPPED state; add ecommerce tests ([#1565](https://github.com/opensearch-project/anomaly-detection/pull/1565))
15+
- Fix flaky ITs ([#1571](https://github.com/opensearch-project/anomaly-detection/pull/1571))
1516

1617

1718
### Infrastructure

src/test/java/org/opensearch/ad/e2e/RealTimeFrequencySmokeIT.java

Lines changed: 50 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55

66
package org.opensearch.ad.e2e;
77

8+
import java.io.IOException;
89
import java.time.Instant;
910
import java.time.format.DateTimeParseException;
1011
import java.util.List;
1112
import java.util.Locale;
1213
import java.util.Map;
14+
import java.util.concurrent.TimeUnit;
1315

1416
import org.apache.logging.log4j.LogManager;
1517
import org.apache.logging.log4j.core.Logger;
@@ -51,6 +53,10 @@ public class RealTimeFrequencySmokeIT extends AbstractADSyntheticDataTest {
5153
public void testSimpleRealTimeBatchWithStats() throws Exception {
5254
RestClient client = client();
5355

56+
LOG.info("Capturing baseline AD execute request count...");
57+
int initialExecuteCount = getLocalAdExecuteRequestCount(client);
58+
LOG.info("Initial AD execute request count: {}", initialExecuteCount);
59+
5460
// 1) Generate and ingest synthetic multi-entity data at 1-minute interval.
5561
List<JsonObject> data = genUniformSingleFeatureData(
5662
INTERVAL_MINUTES,
@@ -133,8 +139,37 @@ public void testSimpleRealTimeBatchWithStats() throws Exception {
133139
LOG.info("Results validation passed: {} results (expected range: {}-{})", rtTotal, minExpected, maxExpected);
134140
}
135141

136-
// 7) Call stats API and check ad_execute_request_count
137-
LOG.info("Checking stats API for execution request count...");
142+
// 7) Call stats API and check ad_execute_request_count delta
143+
int expectedExecutions = 2;
144+
LOG.info("Checking stats API for execution request count delta (expecting >= {})...", expectedExecutions);
145+
146+
// uses assertBusy to poll _local/stats until the cluster-level ad_execute_request_count has grown by at least the
147+
// expected two executions—tolerating other ITs running in parallel while still guaranteeing we observe our detector’s increments.
148+
assertBusy(() -> {
149+
int currentExecuteCount = getLocalAdExecuteRequestCount(client);
150+
int executeCountDelta = currentExecuteCount - initialExecuteCount;
151+
LOG.info("Current AD execute request count: {}, delta since baseline: {}", currentExecuteCount, executeCountDelta);
152+
assertTrue(
153+
"Expected at least " + expectedExecutions + " ad_execute_request_count increase, but delta was " + executeCountDelta,
154+
executeCountDelta >= expectedExecutions
155+
);
156+
}, 2, TimeUnit.MINUTES);
157+
}
158+
159+
/**
160+
* Parses a timestamp string into an Instant object.
161+
* Supports epoch milliseconds format.
162+
*
163+
* @param timestampStr the timestamp string to parse
164+
* @return the parsed Instant
165+
* @throws DateTimeParseException if the timestamp format is not recognized
166+
*/
167+
private Instant parseMilliseconds(String timestampStr) {
168+
return Instant.ofEpochMilli(Long.parseLong(timestampStr));
169+
}
170+
171+
@SuppressWarnings("unchecked")
172+
private int getLocalAdExecuteRequestCount(RestClient client) throws IOException {
138173
Response statsResponse = TestHelpers
139174
.makeRequest(
140175
client,
@@ -150,45 +185,23 @@ public void testSimpleRealTimeBatchWithStats() throws Exception {
150185
Map<String, Object> statsMap = entityAsMap(statsResponse);
151186
LOG.info("Stats response: {}", statsMap);
152187

153-
// Parse the ad_execute_request_count from the nested structure
154188
Map<String, Object> nodes = (Map<String, Object>) statsMap.get("nodes");
155-
if (nodes != null && !nodes.isEmpty()) {
156-
// Get the first node's stats
157-
Map.Entry<String, Object> firstNode = nodes.entrySet().iterator().next();
158-
@SuppressWarnings("unchecked")
159-
Map<String, Object> nodeStats = (Map<String, Object>) firstNode.getValue();
160-
161-
Number adExecuteRequestCount = (Number) nodeStats.get("ad_execute_request_count");
162-
if (adExecuteRequestCount != null) {
163-
int executeCount = adExecuteRequestCount.intValue();
164-
LOG.info("AD execute request count: {}", executeCount);
165-
166-
// Check that execute count is 2 (twice in 4 minutes)
167-
if (executeCount != 2) {
168-
LOG.error("AD execute request count {} is not 2", executeCount);
169-
fail("Expected ad_execute_request_count to be 2, but got " + executeCount);
170-
} else {
171-
LOG.info("Stats validation passed: ad_execute_request_count is {}", executeCount);
172-
}
173-
} else {
174-
LOG.error("ad_execute_request_count not found in node stats");
175-
fail("ad_execute_request_count not found in stats response");
176-
}
177-
} else {
189+
if (nodes == null || nodes.isEmpty()) {
178190
LOG.error("No nodes found in stats response");
179191
fail("No nodes found in stats response");
180192
}
181-
}
182193

183-
/**
184-
* Parses a timestamp string into an Instant object.
185-
* Supports epoch milliseconds format.
186-
*
187-
* @param timestampStr the timestamp string to parse
188-
* @return the parsed Instant
189-
* @throws DateTimeParseException if the timestamp format is not recognized
190-
*/
191-
private Instant parseMilliseconds(String timestampStr) {
192-
return Instant.ofEpochMilli(Long.parseLong(timestampStr));
194+
int totalCount = 0;
195+
for (Object nodeValue : nodes.values()) {
196+
Map<String, Object> nodeStats = (Map<String, Object>) nodeValue;
197+
Number nodeCount = (Number) nodeStats.get("ad_execute_request_count");
198+
if (nodeCount == null) {
199+
LOG.error("ad_execute_request_count not found in node stats: {}", nodeStats);
200+
fail("ad_execute_request_count not found in stats response");
201+
}
202+
totalCount += nodeCount.intValue();
203+
}
204+
205+
return totalCount;
193206
}
194207
}

src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -756,7 +756,20 @@ public void testUpdateAnomalyDetector() throws Exception {
756756
assertEquals("Update anomaly detector failed", RestStatus.OK, TestHelpers.restStatus(updateResponse));
757757
Map<String, Object> responseBody = entityAsMap(updateResponse);
758758
assertEquals("Updated anomaly detector id doesn't match", detector.getId(), responseBody.get("_id"));
759-
assertEquals("Version not incremented", (detector.getVersion().intValue() + 1), (int) responseBody.get("_version"));
759+
760+
if (isResourceSharingFeatureEnabled()) {
761+
// The extra write is coming from the Security plugin’s resource‑sharing handler ResourceSharingIndexHandler.
762+
// It happen on every index/update when the resource-sharing feature flag is enabled.
763+
// ResourceSharingIndexHandler updates the detector document in .opendistro-anomaly-detectors to add the
764+
// all_shared_principals field.
765+
// The Security plugin registers a post-index listener (ResourceIndexListener) which triggers an UpdateRequest to set
766+
// `all_shared_principals` after each detector document index/update.
767+
// ResourceIndexListener (postIndex listener): https://tinyurl.com/4mk73vzm
768+
// ResourceSharingIndexHandler (issues UpdateRequest for `all_shared_principals`): https://tinyurl.com/yujaez4d
769+
assertTrue("Version not incremented", (detector.getVersion().intValue() + 1) <= (int) responseBody.get("_version"));
770+
} else {
771+
assertEquals("Version not incremented", (detector.getVersion().intValue() + 1), (int) responseBody.get("_version"));
772+
}
760773

761774
AnomalyDetector updatedDetector = getConfig(detector.getId(), client());
762775
assertNotEquals("Anomaly detector last update time not changed", updatedDetector.getLastUpdateTime(), detector.getLastUpdateTime());

0 commit comments

Comments
 (0)