Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

### Bug Fixes
- Fixing concurrency bug on writer ([#1508](https://github.com/opensearch-project/anomaly-detection/pull/1508))
- fix(forecast): advance past current interval & anchor on now ([#1528](https://github.com/opensearch-project/anomaly-detection/pull/1528))

### Infrastructure
### Documentation
Expand Down
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,6 @@ List<String> jacocoExclusions = [

// TODO: add test coverage (kaituo)
'org.opensearch.forecast.*',
'org.opensearch.timeseries.transport.ValidateConfigRequest',
'org.opensearch.timeseries.transport.ResultProcessor.PageListener.1',
'org.opensearch.ad.transport.ADHCImputeRequest',
'org.opensearch.timeseries.transport.BaseDeleteConfigTransportAction.1',
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/opensearch/timeseries/AnalysisType.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

public enum AnalysisType {
AD,
FORECAST;
FORECAST,
// for test
UNKNOWN;

public boolean isForecast() {
return this == FORECAST;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,41 @@ public AggregationPrep(SearchFeatureDao searchFeatureDao, TimeValue requestTimeo
this.config = config;
}

/**
* Returns the time‑range bounds using this detector’s **default history length**
* (i.e., the value provided by {@link #getNumberOfSamples()}).
*
* <p>The method delegates to
* {@link #getTimeRangeBounds(IntervalTimeConfiguration, long, int)} and is a
* convenience overload for callers that do not need to specify a custom
* sample count.</p>
*
* @param interval sampling interval configuration (e.g., “5m”, “1h”)
* @param endMillis exclusive upper bound of the time range, expressed in epoch ms
* @return {@code LongBounds} where {@code getMin()} is the computed
* start time and {@code getMax()} equals {@code endMillis}
*/
public LongBounds getTimeRangeBounds(IntervalTimeConfiguration interval, long endMillis) {
return getTimeRangeBounds(interval, endMillis, getNumberOfSamples());
}

/**
* Returns the time‑range bounds using an **explicitly supplied history length**.
*
* <p>The start time is computed as {@code endMillis − (numberOfSamples × interval)}.
* Use this overload when the caller wants full control over how many historical
* samples are considered in the query window.</p>
*
* @param interval sampling interval configuration (e.g., “5m”, “1h”)
* @param endMillis exclusive upper bound of the time range, expressed in epoch ms
* @param numberOfSamples number of historical samples to include; must be &gt; 0
* @return {@code LongBounds} with {@code getMin()} equal to the
* calculated start time and {@code getMax()} equal to {@code endMillis}
* @throws IllegalArgumentException if {@code numberOfSamples} is non‑positive
*/
public LongBounds getTimeRangeBounds(IntervalTimeConfiguration interval, long endMillis, int numberOfSamples) {
long intervalInMillis = IntervalTimeConfiguration.getIntervalInMinute(interval) * 60000;
Long startMillis = endMillis - (getNumberOfSamples() * intervalInMillis);
Long startMillis = endMillis - (numberOfSamples * intervalInMillis);
return new LongBounds(startMillis, endMillis);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class IntervalCalculation {
private final Map<String, Object> topEntity;
private final long endMillis;
private final Config config;
// how many intervals to look back when exploring intervals
private final int lookBackWindows;

public IntervalCalculation(
Config config,
Expand All @@ -75,7 +77,8 @@ public IntervalCalculation(
Clock clock,
SearchFeatureDao searchFeatureDao,
long latestTime,
Map<String, Object> topEntity
Map<String, Object> topEntity,
boolean validate
) {
this.aggregationPrep = new AggregationPrep(searchFeatureDao, requestTimeout, config);
this.client = client;
Expand All @@ -86,12 +89,19 @@ public IntervalCalculation(
this.topEntity = topEntity;
this.endMillis = latestTime;
this.config = config;
if (validate) {
lookBackWindows = config.getHistoryIntervals();
} else {
// look back more when history is not finalized yet yet
lookBackWindows = config.getDefaultHistory() * 2;
}
}

public void findInterval(ActionListener<IntervalTimeConfiguration> listener) {
ActionListener<IntervalTimeConfiguration> minimumIntervalListener = ActionListener.wrap(minInterval -> {
logger.debug("minimum interval found: {}", minInterval);
if (minInterval == null) {
logger.info("Fail to find minimum interval");
logger.debug("Fail to find minimum interval");
listener.onResponse(null);
} else {
// starting exploring whether minimum or larger interval satisfy density requirement
Expand All @@ -105,7 +115,7 @@ private void getBucketAggregates(IntervalTimeConfiguration minimumInterval, Acti
throws IOException {

try {
LongBounds timeStampBounds = aggregationPrep.getTimeRangeBounds(minimumInterval, endMillis);
LongBounds timeStampBounds = aggregationPrep.getTimeRangeBounds(minimumInterval, endMillis, lookBackWindows);
SearchRequest searchRequest = aggregationPrep.createSearchRequest(minimumInterval, timeStampBounds, topEntity, 0);
ActionListener<IntervalTimeConfiguration> intervalListener = ActionListener
.wrap(interval -> listener.onResponse(interval), exception -> {
Expand All @@ -120,6 +130,7 @@ private void getBucketAggregates(IntervalTimeConfiguration minimumInterval, Acti
);
// using the original context in listener as user roles have no permissions for internal operations like fetching a
// checkpoint
logger.debug("Interval explore search request: {}", searchRequest);
clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
searchRequest,
Expand Down Expand Up @@ -160,20 +171,25 @@ public IntervalRecommendationListener(

@Override
public void onResponse(SearchResponse resp) {
logger.debug("interval explorer response: {}", resp);
try {
long shingles = aggregationPrep.getShingleCount(resp);
logger.debug("number of shingles: {}", shingles);

if (shingles >= TimeSeriesSettings.NUM_MIN_SAMPLES) { // dense enough
intervalListener.onResponse(currentIntervalToTry);
return;
}

if (++attempts > 10) { // retry budget exhausted
logger.debug("number of attempts: {}", attempts);
intervalListener.onResponse(null);
return;
}

if (clock.millis() > expirationEpochMs) { // timeout
long nowMillis = clock.millis();
if (nowMillis > expirationEpochMs) { // timeout
logger.debug("Timed out: now={}, expires={}", nowMillis, expirationEpochMs);
intervalListener
.onFailure(
new ValidationException(
Expand All @@ -186,7 +202,8 @@ public void onResponse(SearchResponse resp) {
}

int nextMin = nextNiceInterval((int) currentIntervalToTry.getInterval());
if (nextMin == currentIntervalToTry.getInterval()) { // cannot grow further
if (nextMin <= currentIntervalToTry.getInterval()) { // cannot grow further
logger.debug("Cannot grow interval further: next={}, current={}", nextMin, currentIntervalToTry.getInterval());
intervalListener.onResponse(null);
return;
}
Expand All @@ -199,12 +216,14 @@ public void onResponse(SearchResponse resp) {

private void searchWithDifferentInterval(int newIntervalMinuteValue) {
this.currentIntervalToTry = new IntervalTimeConfiguration(newIntervalMinuteValue, ChronoUnit.MINUTES);
this.currentTimeStampBounds = aggregationPrep.getTimeRangeBounds(currentIntervalToTry, endMillis);
this.currentTimeStampBounds = aggregationPrep.getTimeRangeBounds(currentIntervalToTry, endMillis, lookBackWindows);
// using the original context in listener as user roles have no permissions for internal operations like fetching a
// checkpoint
SearchRequest searchRequest = aggregationPrep.createSearchRequest(currentIntervalToTry, currentTimeStampBounds, topEntity, 0);
logger.debug("next search request: {}", searchRequest);
clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
aggregationPrep.createSearchRequest(currentIntervalToTry, currentTimeStampBounds, topEntity, 0),
searchRequest,
client::search,
user,
client,
Expand Down Expand Up @@ -339,7 +358,7 @@ public void findMedianIntervalAdaptive(ActionListener<IntervalTimeConfiguration>
long totalDocs = r.getHits().getTotalHits() == null ? 0L : r.getHits().getTotalHits().value();

if (totalDocs < 2) {
logger.info("Exit early due to few docs");
logger.debug("Exit early due to few docs");
listener.onResponse(null);
return;
}
Expand Down Expand Up @@ -501,8 +520,9 @@ public void refineGap(
src.aggregation(hist);

SearchRequest searchRequest = new SearchRequest(config.getIndices().toArray(new String[0])).source(src);
logger.debug("Minimum interval search request: {}", searchRequest);
client.search(searchRequest, ActionListener.wrap(resp -> {

logger.debug("Minimum interval search response: {}", resp);
double gap = Double.NaN;
boolean hasEmptyBuckets = false;
Histogram histogram = resp.getAggregations().get("dyn");
Expand Down Expand Up @@ -753,7 +773,7 @@ private static long nextPowerOfTwo(long n) {
/* ------------------------------------------------------------------ */
private static int nextNiceInterval(int currentMin) {
for (int step : INTERVAL_LADDER) {
if (step >= currentMin) {
if (step > currentMin) {
return step;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class LatestTimeRetriever {
private final User user;
private final AnalysisType context;
private final SearchFeatureDao searchFeatureDao;
// whether we should convert future date to now if future data exists
private final boolean convertFutureDatetoNow;

public LatestTimeRetriever(
Config config,
Expand All @@ -67,7 +69,8 @@ public LatestTimeRetriever(
Client client,
User user,
AnalysisType context,
SearchFeatureDao searchFeatureDao
SearchFeatureDao searchFeatureDao,
boolean convertFutureDatetoNow
) {
this.config = config;
this.aggregationPrep = new AggregationPrep(searchFeatureDao, requestTimeout, config);
Expand All @@ -76,6 +79,7 @@ public LatestTimeRetriever(
this.user = user;
this.context = context;
this.searchFeatureDao = searchFeatureDao;
this.convertFutureDatetoNow = convertFutureDatetoNow;
}

/**
Expand All @@ -93,6 +97,10 @@ public void checkIfHC(ActionListener<Pair<Optional<Long>, Map<String, Object>>>
long timeRangeEnd = latestTime.get();
if (currentEpochMillis < timeRangeEnd) {
logger.info(new ParameterizedMessage("Future date is detected: [{}]", latestTime.get()));
if (convertFutureDatetoNow) {
logger.info("Convert future date to now");
timeRangeEnd = currentEpochMillis;
}
}

if (config.isHighCardinality()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,17 @@ public ModelValidationActionHandler(
this.context = context;
// calculate the bounds in a lazy manner
this.timeRangeToSearchForConfiguredInterval = null;
this.latestTimeRetriever = new LatestTimeRetriever(config, requestTimeout, clientUtil, client, user, context, searchFeatureDao);
// validate window delay depends on detection of future date (which will set window delay to 0)
this.latestTimeRetriever = new LatestTimeRetriever(
config,
requestTimeout,
clientUtil,
client,
user,
context,
searchFeatureDao,
false
);
this.intervalIssueType = intervalIssueType;
this.aggregationPrep = new AggregationPrep(searchFeatureDao, requestTimeout, config);
}
Expand Down Expand Up @@ -180,7 +190,8 @@ private void getSampleRangesForValidationChecks(
clock,
searchFeatureDao,
latestTime.get(),
topEntity
topEntity,
true
)
.findInterval(
ActionListener.wrap(interval -> processIntervalRecommendation(interval, latestTime.get(), topEntity), listener::onFailure)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ protected void suggestInterval(
client,
user,
context,
searchFeatureDao
searchFeatureDao,
// simulate run once and real time scenario where we operate relative to now
true
);

ActionListener<Pair<Optional<Long>, Map<String, Object>>> latestTimeListener = ActionListener.wrap(latestEntityAttributes -> {
Expand All @@ -142,7 +144,8 @@ protected void suggestInterval(
clock,
searchFeatureDao,
latestTime.get(),
latestEntityAttributes.getRight()
latestEntityAttributes.getRight(),
false
);
intervalCalculation
.findInterval(
Expand Down Expand Up @@ -237,7 +240,9 @@ protected void suggestWindowDelay(Config config, User user, TimeValue timeout, A
client,
user,
context,
searchFeatureDao
searchFeatureDao,
// if future date is found, just set window delay to 0
false
);
ActionListener<Pair<Optional<Long>, Map<String, Object>>> latestTimeListener = ActionListener.wrap(latestEntityAttributes -> {
Optional<Long> latestTime = latestEntityAttributes.getLeft();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,6 @@ private void bulkIndexTestData(List<JsonObject> data, String datasetName, int tr
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana"))
);
Thread.sleep(1_000);
waitAllSyncheticDataIngested(data.size(), datasetName, client);
waitAllSyncheticDataIngestedOrdered(data.size(), datasetName, client);
}
}
Loading
Loading