Skip to content

Commit b7e47be

Browse files
committed
Support daily and multi-hour intervals
This commit adds support for configuration intervals exceeding one hour. Key changes: * For intervals greater than 1 hour, models are no longer loaded directly into cache. Instead, they are sent to the cold entity queue and checkpoints are reloaded at each interval. Additionally, cold entity processing priority for long-interval configs is elevated from 'LOW' to 'MEDIUM' to ensure timely processing. * Improved Suggest and Validate APIs: Replaced the previous median-based interval detection method with a robust adaptive "zoom-in/zoom-out" algorithm. The new method employs progressively refined date histograms to accurately determine optimal intervals. This enhancement enables validation and suggestions for intervals longer than 1 hour. Testing: * Conducted multi-day manual tests to verify daily interval functionality. * Added ForecastRestApiIT.testDailyInterval integration test to validate the full forecasting workflow (interval suggestion, forecaster creation, execution, and stats verification) for daily interval data. Signed-off-by: Kaituo Li <kaituo@amazon.com>
1 parent d92bb7e commit b7e47be

34 files changed

+3998
-229
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
66
## [Unreleased 3.x](https://github.com/opensearch-project/anomaly-detection/compare/3.0...HEAD)
77
### Features
88
### Enhancements
9+
- Support >1 hr intervals ([#1513](https://github.com/opensearch-project/anomaly-detection/pull/1513))
910

1011

1112
### Bug Fixes

build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1001,7 +1001,6 @@ List<String> jacocoExclusions = [
10011001

10021002
// TODO: add test coverage (kaituo)
10031003
'org.opensearch.forecast.*',
1004-
'org.opensearch.timeseries.model.ModelProfileOnNode',
10051004
'org.opensearch.timeseries.transport.ValidateConfigRequest',
10061005
'org.opensearch.timeseries.transport.ResultProcessor.PageListener.1',
10071006
'org.opensearch.ad.transport.ADHCImputeRequest',

src/main/java/org/opensearch/ad/task/ADTaskManager.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1926,7 +1926,9 @@ protected <T> void resetLatestConfigTaskState(
19261926

19271927
@Override
19281928
protected String triageState(Boolean hasResult, String error, Long rcfTotalUpdates) {
1929-
if (rcfTotalUpdates < TimeSeriesSettings.NUM_MIN_SAMPLES) {
1929+
if (hasResult != null && hasResult) {
1930+
return TaskState.RUNNING.name();
1931+
} else if (rcfTotalUpdates != null && rcfTotalUpdates < TimeSeriesSettings.NUM_MIN_SAMPLES) {
19301932
return TaskState.INIT.name();
19311933
} else {
19321934
return TaskState.RUNNING.name();

src/main/java/org/opensearch/ad/transport/ADSingleStreamResultTransportAction.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.opensearch.ad.ratelimit.ADCheckpointMaintainWorker;
2323
import org.opensearch.ad.ratelimit.ADCheckpointReadWorker;
2424
import org.opensearch.ad.ratelimit.ADCheckpointWriteWorker;
25+
import org.opensearch.ad.ratelimit.ADColdEntityWorker;
2526
import org.opensearch.ad.ratelimit.ADColdStartWorker;
2627
import org.opensearch.ad.ratelimit.ADResultWriteRequest;
2728
import org.opensearch.ad.ratelimit.ADSaveResultStrategy;
@@ -41,7 +42,7 @@
4142
import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;
4243

4344
public class ADSingleStreamResultTransportAction extends
44-
AbstractSingleStreamResultTransportAction<ThresholdedRandomCutForest, ADIndex, ADIndexManagement, ADCheckpointDao, ADCheckpointWriteWorker, ADCheckpointMaintainWorker, ADCacheBuffer, ADPriorityCache, ADCacheProvider, AnomalyResult, ThresholdingResult, ADColdStart, ADModelManager, ADPriorityCache, ADSaveResultStrategy, ADTaskCacheManager, ADTaskType, ADTask, ADTaskManager, ADColdStartWorker, ADRealTimeInferencer, ADCheckpointReadWorker, ADResultWriteRequest> {
45+
AbstractSingleStreamResultTransportAction<ThresholdedRandomCutForest, ADIndex, ADIndexManagement, ADCheckpointDao, ADCheckpointWriteWorker, ADCheckpointMaintainWorker, ADCacheBuffer, ADPriorityCache, ADCacheProvider, AnomalyResult, ThresholdingResult, ADColdStart, ADModelManager, ADPriorityCache, ADSaveResultStrategy, ADTaskCacheManager, ADTaskType, ADTask, ADTaskManager, ADColdStartWorker, ADRealTimeInferencer, ADCheckpointReadWorker, ADResultWriteRequest, ADColdEntityWorker> {
4546

4647
@Inject
4748
public ADSingleStreamResultTransportAction(
@@ -52,7 +53,8 @@ public ADSingleStreamResultTransportAction(
5253
NodeStateManager stateManager,
5354
ADCheckpointReadWorker checkpointReadQueue,
5455
ADRealTimeInferencer inferencer,
55-
ThreadPool threadPool
56+
ThreadPool threadPool,
57+
ADColdEntityWorker coldEntityQueue
5658
) {
5759
super(
5860
transportService,
@@ -65,7 +67,8 @@ public ADSingleStreamResultTransportAction(
6567
AnalysisType.AD,
6668
inferencer,
6769
threadPool,
68-
TimeSeriesAnalyticsPlugin.AD_THREAD_POOL_NAME
70+
TimeSeriesAnalyticsPlugin.AD_THREAD_POOL_NAME,
71+
coldEntityQueue
6972
);
7073
}
7174

src/main/java/org/opensearch/forecast/ml/ForecastRealTimeInferencer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public ForecastRealTimeInferencer(
4646
super(
4747
modelManager,
4848
stats,
49-
StatNames.FORECAST_MODEL_CORRUTPION_COUNT.getName(),
49+
StatNames.FORECAST_MODEL_CORRUPTION_COUNT.getName(),
5050
checkpointDao,
5151
coldStartWorker,
5252
resultWriteWorker,

src/main/java/org/opensearch/forecast/transport/ForecastSingleStreamResultTransportAction.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.opensearch.forecast.ratelimit.ForecastCheckpointMaintainWorker;
2626
import org.opensearch.forecast.ratelimit.ForecastCheckpointReadWorker;
2727
import org.opensearch.forecast.ratelimit.ForecastCheckpointWriteWorker;
28+
import org.opensearch.forecast.ratelimit.ForecastColdEntityWorker;
2829
import org.opensearch.forecast.ratelimit.ForecastColdStartWorker;
2930
import org.opensearch.forecast.ratelimit.ForecastResultWriteRequest;
3031
import org.opensearch.forecast.ratelimit.ForecastSaveResultStrategy;
@@ -43,7 +44,7 @@
4344
import com.amazon.randomcutforest.parkservices.RCFCaster;
4445

4546
public class ForecastSingleStreamResultTransportAction extends
46-
AbstractSingleStreamResultTransportAction<RCFCaster, ForecastIndex, ForecastIndexManagement, ForecastCheckpointDao, ForecastCheckpointWriteWorker, ForecastCheckpointMaintainWorker, ForecastCacheBuffer, ForecastPriorityCache, ForecastCacheProvider, ForecastResult, RCFCasterResult, ForecastColdStart, ForecastModelManager, ForecastPriorityCache, ForecastSaveResultStrategy, TaskCacheManager, ForecastTaskType, ForecastTask, ForecastTaskManager, ForecastColdStartWorker, ForecastRealTimeInferencer, ForecastCheckpointReadWorker, ForecastResultWriteRequest> {
47+
AbstractSingleStreamResultTransportAction<RCFCaster, ForecastIndex, ForecastIndexManagement, ForecastCheckpointDao, ForecastCheckpointWriteWorker, ForecastCheckpointMaintainWorker, ForecastCacheBuffer, ForecastPriorityCache, ForecastCacheProvider, ForecastResult, RCFCasterResult, ForecastColdStart, ForecastModelManager, ForecastPriorityCache, ForecastSaveResultStrategy, TaskCacheManager, ForecastTaskType, ForecastTask, ForecastTaskManager, ForecastColdStartWorker, ForecastRealTimeInferencer, ForecastCheckpointReadWorker, ForecastResultWriteRequest, ForecastColdEntityWorker> {
4748

4849
private static final Logger LOG = LogManager.getLogger(ForecastSingleStreamResultTransportAction.class);
4950

@@ -56,7 +57,8 @@ public ForecastSingleStreamResultTransportAction(
5657
NodeStateManager stateManager,
5758
ForecastCheckpointReadWorker checkpointReadQueue,
5859
ForecastRealTimeInferencer inferencer,
59-
ThreadPool threadPool
60+
ThreadPool threadPool,
61+
ForecastColdEntityWorker coldEntityWorker
6062
) {
6163
super(
6264
transportService,
@@ -69,7 +71,8 @@ public ForecastSingleStreamResultTransportAction(
6971
AnalysisType.FORECAST,
7072
inferencer,
7173
threadPool,
72-
TimeSeriesAnalyticsPlugin.FORECAST_THREAD_POOL_NAME
74+
TimeSeriesAnalyticsPlugin.FORECAST_THREAD_POOL_NAME,
75+
coldEntityWorker
7376
);
7477
}
7578

src/main/java/org/opensearch/forecast/transport/SearchTopForecastResultTransportAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ protected void doExecute(Task task, SearchTopForecastResultRequest request, Acti
391391
// This same method is used for security handling for the search results action.
392392
// Since this action
393393
// is doing fundamentally the same thing, we can reuse the security logic here.
394-
logger.info("top forecast request:" + searchRequest);
394+
logger.debug("top forecast request:" + searchRequest);
395395
searchHandler.search(searchRequest, onSearchResponse(request, categoryFields, forecaster, listener));
396396
}, exception -> {
397397
logger.error("Failed to get top forecast results", exception);
@@ -407,7 +407,7 @@ private ActionListener<SearchResponse> onSearchResponse(
407407
ActionListener<SearchTopForecastResultResponse> listener
408408
) {
409409
return ActionListener.wrap(response -> {
410-
logger.info("top forecast response:" + response);
410+
logger.debug("top forecast response:" + response);
411411
Aggregations aggs = response.getAggregations();
412412
if (aggs == null) {
413413
// empty result (e.g., cannot find forecasts within [forecast from, forecast from + horizon * interval] range).

src/main/java/org/opensearch/timeseries/ExecuteResultResponseRecorder.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -161,17 +161,30 @@ protected void delayedUpdate(ResultResponse<IndexableResultType> response, Strin
161161
profiles.add(ProfileName.INIT_PROGRESS);
162162
ProfileRequest profileRequest = new ProfileRequest(configId, profiles, dataNodes);
163163
Runnable profileInitProgress = () -> {
164-
client.execute(profileAction, profileRequest, ActionListener.wrap(r -> {
165-
log.info("Update latest realtime task for config {}, total updates: {}", configId, r.getTotalUpdates());
166-
updateLatestRealtimeTask(
167-
configId,
168-
null,
169-
r.getTotalUpdates(),
170-
response.getConfigIntervalInMinutes(),
171-
response.getError(),
172-
clock
173-
);
174-
}, e -> { log.error("Failed to update latest realtime task for " + configId, e); }));
164+
nodeStateManager.getConfig(configId, analysisType, false, ActionListener.wrap(configOptional -> {
165+
if (!configOptional.isPresent()) {
166+
log.warn("fail to get config");
167+
return;
168+
}
169+
170+
Config config = configOptional.get();
171+
if (config.isLongInterval()) {
172+
log.info("Update latest realtime task for long-interval config {}", configId);
173+
updateLatestRealtimeTask(configId, null, 0L, response.getConfigIntervalInMinutes(), response.getError(), clock);
174+
} else {
175+
client.execute(profileAction, profileRequest, ActionListener.wrap(r -> {
176+
log.info("Update latest realtime task for config {}, total updates: {}", configId, r.getTotalUpdates());
177+
updateLatestRealtimeTask(
178+
configId,
179+
null,
180+
r.getTotalUpdates(),
181+
response.getConfigIntervalInMinutes(),
182+
response.getError(),
183+
clock
184+
);
185+
}, e -> { log.error("Failed to update latest realtime task for " + configId, e); }));
186+
}
187+
}, e -> log.warn("fail to get config", e)));
175188
};
176189
if (!taskManager.isRealtimeTaskStartInitializing(configId)) {
177190
// real time init progress is 0 may mean this is a newly started detector
@@ -207,7 +220,6 @@ protected void updateLatestRealtimeTask(
207220

208221
hasRecentResult(
209222
configId,
210-
taskState,
211223
configIntervalInMinutes,
212224
error,
213225
clock,
@@ -282,15 +294,13 @@ public void indexResultException(
282294
* for search.
283295
*
284296
* @param configId Config id
285-
* @param taskState task state
286297
* @param overrideIntervalMinutes config interval in minutes
287298
* @param error Error
288299
* @param clock Clock to get current time
289300
* @param listener Callback to return whether any result has been generated in the past two intervals.
290301
*/
291302
private void hasRecentResult(
292303
String configId,
293-
String taskState,
294304
Long overrideIntervalMinutes,
295305
String error,
296306
Clock clock,

src/main/java/org/opensearch/timeseries/ProfileRunner.java

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.opensearch.timeseries.model.InitProgressProfile;
5050
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
5151
import org.opensearch.timeseries.model.Job;
52+
import org.opensearch.timeseries.model.ModelProfileOnNode;
5253
import org.opensearch.timeseries.model.ProfileName;
5354
import org.opensearch.timeseries.model.TaskType;
5455
import org.opensearch.timeseries.model.TimeSeriesTask;
@@ -66,6 +67,7 @@
6667

6768
public abstract class ProfileRunner<TaskCacheManagerType extends TaskCacheManager, TaskTypeEnum extends TaskType, TaskClass extends TimeSeriesTask, IndexType extends Enum<IndexType> & TimeSeriesIndex, IndexManagementType extends IndexManagement<IndexType>, TaskProfileType extends TaskProfile<TaskClass>, TaskManagerType extends TaskManager<TaskCacheManagerType, TaskTypeEnum, TaskClass, IndexType, IndexManagementType>, ConfigProfileType extends ConfigProfile<TaskClass, TaskProfileType>, ProfileActionType extends ActionType<ProfileResponse>, TaskProfileRunnerType extends TaskProfileRunner<TaskClass, TaskProfileType>>
6869
extends AbstractProfileRunner {
70+
6971
private final Logger logger = LogManager.getLogger(ProfileRunner.class);
7072
protected Client client;
7173
protected SecurityClientUtil clientUtil;
@@ -378,8 +380,31 @@ protected void profileModels(
378380
) {
379381
DiscoveryNode[] dataNodes = nodeFilter.getEligibleDataNodes();
380382
ProfileRequest profileRequest = new ProfileRequest(config.getId(), profiles, dataNodes);
381-
client.execute(profileAction, profileRequest, onModelResponse(config, profiles, job, listener));// get init
382-
// progress
383+
if (config.isLongInterval()) {
384+
ConfigProfileType.Builder<TaskClass, TaskProfileType> profile = createProfileBuilder();
385+
if (profiles.contains(ProfileName.COORDINATING_NODE)) {
386+
profile.coordinatingNode("");
387+
}
388+
if (profiles.contains(ProfileName.TOTAL_SIZE_IN_BYTES)) {
389+
profile.totalSizeInBytes(0L);
390+
}
391+
if (profiles.contains(ProfileName.MODELS)) {
392+
profile.modelProfile(new ModelProfileOnNode[0]);
393+
profile.modelCount(0);
394+
}
395+
if (config.isHighCardinality() && profiles.contains(ProfileName.ACTIVE_ENTITIES)) {
396+
profile.activeEntities(0L);
397+
}
398+
399+
// only need to do it for models in priority cache. Single stream and HC are the same logic
400+
if (profiles.contains(ProfileName.INIT_PROGRESS) || profiles.contains(ProfileName.STATE)) {
401+
profileStateRelated(job, profiles, 0L, profile, config, listener);
402+
} else {
403+
listener.onResponse(profile.build());
404+
}
405+
} else {
406+
client.execute(profileAction, profileRequest, onModelResponse(config, profiles, job, listener));
407+
}
383408
}
384409

385410
private ActionListener<ProfileResponse> onModelResponse(
@@ -405,10 +430,9 @@ private ActionListener<ProfileResponse> onModelResponse(
405430
profile.activeEntities(profileResponse.getActiveEntities());
406431
}
407432

408-
// only need to do it for models in priority cache. AD single stream analysis has a
409-
// different workflow to determine state and init progress
433+
// only need to do it for models in priority cache. Single stream and HC are the same logic.
410434
if (profilesToCollect.contains(ProfileName.INIT_PROGRESS) || profilesToCollect.contains(ProfileName.STATE)) {
411-
profileStateRelated(job, profilesToCollect, profileResponse, profile, config, listener);
435+
profileStateRelated(job, profilesToCollect, profileResponse.getTotalUpdates(), profile, config, listener);
412436
} else {
413437
listener.onResponse(profile.build());
414438
}
@@ -418,18 +442,17 @@ private ActionListener<ProfileResponse> onModelResponse(
418442
private void profileStateRelated(
419443
Job job,
420444
Set<ProfileName> profilesToCollect,
421-
ProfileResponse profileResponse,
445+
Long totalUpdates,
422446
ConfigProfileType.Builder<TaskClass, TaskProfileType> profileBuilder,
423447
Config config,
424448
MultiResponsesDelegateActionListener<ConfigProfileType> listener
425449
) {
426450
if (job.isEnabled()) {
427-
if (profileResponse.getTotalUpdates() < requiredSamples) {
451+
if (totalUpdates < requiredSamples) {
428452
// need to double check for an HC analysis
429453
// since what ProfileResponse returns is the highest priority entity currently in memory, but
430454
// another entity might have already been initialized and sit somewhere else (in memory or on disk).
431455
long enabledTime = job.getEnabledTime().toEpochMilli();
432-
long totalUpdates = profileResponse.getTotalUpdates();
433456
ProfileUtil
434457
.confirmRealtimeResultStatus(
435458
config,

src/main/java/org/opensearch/timeseries/TimeSeriesAnalyticsPlugin.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1229,14 +1229,14 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
12291229
.put(StatNames.FORECASTER_COUNT.getName(), new TimeSeriesStat<>(true, new SettableSupplier()))
12301230
.put(StatNames.SINGLE_STREAM_FORECASTER_COUNT.getName(), new TimeSeriesStat<>(true, new SettableSupplier()))
12311231
.put(StatNames.HC_FORECASTER_COUNT.getName(), new TimeSeriesStat<>(true, new SettableSupplier()))
1232-
.put(StatNames.FORECAST_MODEL_CORRUTPION_COUNT.getName(), new TimeSeriesStat<>(false, new CounterSupplier()))
1232+
.put(StatNames.FORECAST_MODEL_CORRUPTION_COUNT.getName(), new TimeSeriesStat<>(false, new CounterSupplier()))
12331233
.put(
12341234
StatNames.MODEL_INFORMATION.getName(),
12351235
new TimeSeriesStat<>(false, new ForecastModelsOnNodeSupplier(forecastCacheProvider, settings, clusterService))
12361236
)
12371237
.put(
1238-
StatNames.AD_CONFIG_INDEX_STATUS.getName(),
1239-
new TimeSeriesStat<>(true, new IndexStatusSupplier(indexUtils, ADCommonName.CONFIG_INDEX))
1238+
StatNames.FORECAST_CONFIG_INDEX_STATUS.getName(),
1239+
new TimeSeriesStat<>(true, new IndexStatusSupplier(indexUtils, ForecastCommonName.CONFIG_INDEX))
12401240
)
12411241
.put(
12421242
StatNames.JOB_INDEX_STATUS.getName(),

0 commit comments

Comments
 (0)