Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ Compatible with OpenSearch 3.1.0
### Bug Fixes
- Fix incorrect task state handling in ForecastRunOnceTransportAction ([#1489](https://github.com/opensearch-project/anomaly-detection/pull/1489))
- Fix incorrect task state handling in ForecastRunOnceTransportAction ([#1493](https://github.com/opensearch-project/anomaly-detection/pull/1493))

- Refine cold-start, window delay, and task updates ([#1496](https://github.com/opensearch-project/anomaly-detection/pull/1496))

6 changes: 4 additions & 2 deletions src/main/java/org/opensearch/ad/ml/ADColdStart.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,10 @@ protected List<AnomalyResult> trainModelFromDataSegments(

entityState.setLastUsedTime(clock.instant());

// save to checkpoint
checkpointWriteWorker.write(entityState, true, RequestPriority.MEDIUM);
// save to checkpoint for real time only
if (null == taskId) {
checkpointWriteWorker.write(entityState, true, RequestPriority.MEDIUM);
}

return results;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,8 @@ protected ModelState<RCFCaster> fromSingleStreamModelCheckpoint(Map<String, Obje
private RCFCaster loadRCFCaster(Map<String, Object> checkpoint, String modelId) {
String model = (String) checkpoint.get(CommonName.FIELD_MODEL);
if (model == null || model.length() > maxCheckpointBytes) {
logger.warn(new ParameterizedMessage("[{}]'s model too large: [{}] bytes", modelId, model.length()));
logger
.warn(new ParameterizedMessage("[{}]'s model empty or too large: [{}] bytes", modelId, model == null ? 0 : model.length()));
return null;
}
return toRCFCaster(model);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ private void coldStart(
}

if (lastThrottledColdStartTime.plus(Duration.ofMinutes(coolDownMinutes)).isAfter(clock.instant())) {
logger.info("Still in cool down.");
listener.onResponse(null);
return;
}
Expand Down Expand Up @@ -290,8 +291,11 @@ private void coldStart(
logger.info("Not enough data to train model: {}, currently we have {}", modelId, dataSize);

trainingData.forEach(modelState::addSample);
// save to checkpoint
checkpointWriteWorker.write(modelState, true, RequestPriority.MEDIUM);
// save to checkpoint for real time only
if (null == coldStartRequest.getTaskId()) {
checkpointWriteWorker.write(modelState, true, RequestPriority.MEDIUM);
}

listener.onResponse(null);
}
} else {
Expand Down Expand Up @@ -376,7 +380,6 @@ private void getColdStartData(String configId, FeatureRequest coldStartRequest,
// [current start, current end] for training. So we fetch training data ending at current start
long endTimeMs = coldStartRequest.getDataStartTimeMillis();
int numberOfSamples = selectNumberOfSamples(config);

// we start with round 0
getFeatures(
listener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,8 @@ public void updateTask(String taskId, Map<String, Object> updatedFields, ActionL
updatedContent.put(TimeSeriesTask.LAST_UPDATE_TIME_FIELD, Instant.now().toEpochMilli());
updateRequest.doc(updatedContent);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
// OpenSearch will transparently re‑read the doc and retry up to 2 times.
updateRequest.retryOnConflict(2);
client.update(updateRequest, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.opensearch.timeseries.rest.handler.HistorySuggest;
import org.opensearch.timeseries.rest.handler.IntervalCalculation;
import org.opensearch.timeseries.rest.handler.LatestTimeRetriever;
import org.opensearch.timeseries.settings.TimeSeriesSettings;
import org.opensearch.timeseries.util.ParseUtils;
import org.opensearch.timeseries.util.SecurityClientUtil;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -249,8 +248,44 @@ protected void suggestWindowDelay(Config config, User user, TimeValue timeout, A
// we may get future date (e.g., in testing)
long currentMillis = clock.millis();

// ---------------------------------------------------------------------------
// Adaptive window-delay calculation
// ---------------------------------------------------------------------------
// Goal: pick a delay long enough that all data for the current query
// window has been ingested, so the config never sees “future gaps”.
//
// Algorithm
// 1. Compute the raw lag (`gapMs`) between now and the newest document.
// 2. Convert that lag into whole config-interval “buckets” (ceil).
// We use the integer-math identity
// ceil(a / b) = (a + b − 1) / b for positive a, b
// so we never under-estimate the number of missing buckets.
// 3. Add one extra “safety” bucket to cover clock skew / network jitter.
// 4. Transform the final bucket count back to milliseconds.
//
// ---------------------------------------------------------------------------

if (currentMillis > latestTime.get()) {
windowDelayMillis = (long) Math.ceil((currentMillis - latestTime.get()) * TimeSeriesSettings.WINDOW_DELAY_RATIO);

// Milliseconds we are behind real time
long gapMs = currentMillis - latestTime.get();

// Length of one bucket (config interval)
long bucketMs = config.getIntervalInMilliseconds();

/* Missing buckets (ceiling division)
Example: gap = 15 000 ms, bucket = 10 000 ms
bucketsBehind = (15 000 + 10 000 − 1) / 10 000
= 24 999 / 10 000
= 2 ← correct (15 000 ms spans 2 full buckets)
*/
long bucketsBehind = (gapMs + bucketMs - 1) / bucketMs;

// Always keep one extra bucket as a cushion
long safetyBuckets = 1;

// Convert back to milliseconds
windowDelayMillis = (bucketsBehind + safetyBuckets) * bucketMs;
}

// in case windowDelayMillis is small, we want at least 1 minute
Expand Down
Loading