Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
## [Unreleased 3.x](https://github.com/opensearch-project/anomaly-detection/compare/3.0...HEAD)
### Features
### Enhancements
- Support >1 hr intervals ([#1513](https://github.com/opensearch-project/anomaly-detection/pull/1513))


### Bug Fixes
Expand Down
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,6 @@ List<String> jacocoExclusions = [

// TODO: add test coverage (kaituo)
'org.opensearch.forecast.*',
'org.opensearch.timeseries.model.ModelProfileOnNode',
'org.opensearch.timeseries.transport.ValidateConfigRequest',
'org.opensearch.timeseries.transport.ResultProcessor.PageListener.1',
'org.opensearch.ad.transport.ADHCImputeRequest',
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/opensearch/ad/task/ADTaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -1926,7 +1926,9 @@ protected <T> void resetLatestConfigTaskState(

@Override
protected String triageState(Boolean hasResult, String error, Long rcfTotalUpdates) {
if (rcfTotalUpdates < TimeSeriesSettings.NUM_MIN_SAMPLES) {
if (hasResult != null && hasResult) {
return TaskState.RUNNING.name();
} else if (rcfTotalUpdates != null && rcfTotalUpdates < TimeSeriesSettings.NUM_MIN_SAMPLES) {
return TaskState.INIT.name();
} else {
return TaskState.RUNNING.name();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.ad.ratelimit.ADCheckpointMaintainWorker;
import org.opensearch.ad.ratelimit.ADCheckpointReadWorker;
import org.opensearch.ad.ratelimit.ADCheckpointWriteWorker;
import org.opensearch.ad.ratelimit.ADColdEntityWorker;
import org.opensearch.ad.ratelimit.ADColdStartWorker;
import org.opensearch.ad.ratelimit.ADResultWriteRequest;
import org.opensearch.ad.ratelimit.ADSaveResultStrategy;
Expand All @@ -41,7 +42,7 @@
import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

public class ADSingleStreamResultTransportAction extends
AbstractSingleStreamResultTransportAction<ThresholdedRandomCutForest, ADIndex, ADIndexManagement, ADCheckpointDao, ADCheckpointWriteWorker, ADCheckpointMaintainWorker, ADCacheBuffer, ADPriorityCache, ADCacheProvider, AnomalyResult, ThresholdingResult, ADColdStart, ADModelManager, ADPriorityCache, ADSaveResultStrategy, ADTaskCacheManager, ADTaskType, ADTask, ADTaskManager, ADColdStartWorker, ADRealTimeInferencer, ADCheckpointReadWorker, ADResultWriteRequest> {
AbstractSingleStreamResultTransportAction<ThresholdedRandomCutForest, ADIndex, ADIndexManagement, ADCheckpointDao, ADCheckpointWriteWorker, ADCheckpointMaintainWorker, ADCacheBuffer, ADPriorityCache, ADCacheProvider, AnomalyResult, ThresholdingResult, ADColdStart, ADModelManager, ADPriorityCache, ADSaveResultStrategy, ADTaskCacheManager, ADTaskType, ADTask, ADTaskManager, ADColdStartWorker, ADRealTimeInferencer, ADCheckpointReadWorker, ADResultWriteRequest, ADColdEntityWorker> {

@Inject
public ADSingleStreamResultTransportAction(
Expand All @@ -52,7 +53,8 @@ public ADSingleStreamResultTransportAction(
NodeStateManager stateManager,
ADCheckpointReadWorker checkpointReadQueue,
ADRealTimeInferencer inferencer,
ThreadPool threadPool
ThreadPool threadPool,
ADColdEntityWorker coldEntityQueue
) {
super(
transportService,
Expand All @@ -65,7 +67,8 @@ public ADSingleStreamResultTransportAction(
AnalysisType.AD,
inferencer,
threadPool,
TimeSeriesAnalyticsPlugin.AD_THREAD_POOL_NAME
TimeSeriesAnalyticsPlugin.AD_THREAD_POOL_NAME,
coldEntityQueue
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public ForecastRealTimeInferencer(
super(
modelManager,
stats,
StatNames.FORECAST_MODEL_CORRUTPION_COUNT.getName(),
StatNames.FORECAST_MODEL_CORRUPTION_COUNT.getName(),
checkpointDao,
coldStartWorker,
resultWriteWorker,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.forecast.ratelimit.ForecastCheckpointMaintainWorker;
import org.opensearch.forecast.ratelimit.ForecastCheckpointReadWorker;
import org.opensearch.forecast.ratelimit.ForecastCheckpointWriteWorker;
import org.opensearch.forecast.ratelimit.ForecastColdEntityWorker;
import org.opensearch.forecast.ratelimit.ForecastColdStartWorker;
import org.opensearch.forecast.ratelimit.ForecastResultWriteRequest;
import org.opensearch.forecast.ratelimit.ForecastSaveResultStrategy;
Expand All @@ -43,7 +44,7 @@
import com.amazon.randomcutforest.parkservices.RCFCaster;

public class ForecastSingleStreamResultTransportAction extends
AbstractSingleStreamResultTransportAction<RCFCaster, ForecastIndex, ForecastIndexManagement, ForecastCheckpointDao, ForecastCheckpointWriteWorker, ForecastCheckpointMaintainWorker, ForecastCacheBuffer, ForecastPriorityCache, ForecastCacheProvider, ForecastResult, RCFCasterResult, ForecastColdStart, ForecastModelManager, ForecastPriorityCache, ForecastSaveResultStrategy, TaskCacheManager, ForecastTaskType, ForecastTask, ForecastTaskManager, ForecastColdStartWorker, ForecastRealTimeInferencer, ForecastCheckpointReadWorker, ForecastResultWriteRequest> {
AbstractSingleStreamResultTransportAction<RCFCaster, ForecastIndex, ForecastIndexManagement, ForecastCheckpointDao, ForecastCheckpointWriteWorker, ForecastCheckpointMaintainWorker, ForecastCacheBuffer, ForecastPriorityCache, ForecastCacheProvider, ForecastResult, RCFCasterResult, ForecastColdStart, ForecastModelManager, ForecastPriorityCache, ForecastSaveResultStrategy, TaskCacheManager, ForecastTaskType, ForecastTask, ForecastTaskManager, ForecastColdStartWorker, ForecastRealTimeInferencer, ForecastCheckpointReadWorker, ForecastResultWriteRequest, ForecastColdEntityWorker> {

private static final Logger LOG = LogManager.getLogger(ForecastSingleStreamResultTransportAction.class);

Expand All @@ -56,7 +57,8 @@ public ForecastSingleStreamResultTransportAction(
NodeStateManager stateManager,
ForecastCheckpointReadWorker checkpointReadQueue,
ForecastRealTimeInferencer inferencer,
ThreadPool threadPool
ThreadPool threadPool,
ForecastColdEntityWorker coldEntityWorker
) {
super(
transportService,
Expand All @@ -69,7 +71,8 @@ public ForecastSingleStreamResultTransportAction(
AnalysisType.FORECAST,
inferencer,
threadPool,
TimeSeriesAnalyticsPlugin.FORECAST_THREAD_POOL_NAME
TimeSeriesAnalyticsPlugin.FORECAST_THREAD_POOL_NAME,
coldEntityWorker
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ protected void doExecute(Task task, SearchTopForecastResultRequest request, Acti
// This same method is used for security handling for the search results action.
// Since this action
// is doing fundamentally the same thing, we can reuse the security logic here.
logger.info("top forecast request:" + searchRequest);
logger.debug("top forecast request:" + searchRequest);
searchHandler.search(searchRequest, onSearchResponse(request, categoryFields, forecaster, listener));
}, exception -> {
logger.error("Failed to get top forecast results", exception);
Expand All @@ -407,7 +407,7 @@ private ActionListener<SearchResponse> onSearchResponse(
ActionListener<SearchTopForecastResultResponse> listener
) {
return ActionListener.wrap(response -> {
logger.info("top forecast response:" + response);
logger.debug("top forecast response:" + response);
Aggregations aggs = response.getAggregations();
if (aggs == null) {
// empty result (e.g., cannot find forecasts within [forecast from, forecast from + horizon * interval] range).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,30 @@ protected void delayedUpdate(ResultResponse<IndexableResultType> response, Strin
profiles.add(ProfileName.INIT_PROGRESS);
ProfileRequest profileRequest = new ProfileRequest(configId, profiles, dataNodes);
Runnable profileInitProgress = () -> {
client.execute(profileAction, profileRequest, ActionListener.wrap(r -> {
log.info("Update latest realtime task for config {}, total updates: {}", configId, r.getTotalUpdates());
updateLatestRealtimeTask(
configId,
null,
r.getTotalUpdates(),
response.getConfigIntervalInMinutes(),
response.getError(),
clock
);
}, e -> { log.error("Failed to update latest realtime task for " + configId, e); }));
nodeStateManager.getConfig(configId, analysisType, false, ActionListener.wrap(configOptional -> {
if (!configOptional.isPresent()) {
log.warn("fail to get config");
return;
}

Config config = configOptional.get();
if (config.isLongInterval()) {
log.info("Update latest realtime task for long-interval config {}", configId);
updateLatestRealtimeTask(configId, null, 0L, response.getConfigIntervalInMinutes(), response.getError(), clock);
} else {
client.execute(profileAction, profileRequest, ActionListener.wrap(r -> {
log.info("Update latest realtime task for config {}, total updates: {}", configId, r.getTotalUpdates());
updateLatestRealtimeTask(
configId,
null,
r.getTotalUpdates(),
response.getConfigIntervalInMinutes(),
response.getError(),
clock
);
}, e -> { log.error("Failed to update latest realtime task for " + configId, e); }));
}
}, e -> log.warn("fail to get config", e)));
};
if (!taskManager.isRealtimeTaskStartInitializing(configId)) {
// real time init progress is 0 may mean this is a newly started detector
Expand Down Expand Up @@ -207,7 +220,6 @@ protected void updateLatestRealtimeTask(

hasRecentResult(
configId,
taskState,
configIntervalInMinutes,
error,
clock,
Expand Down Expand Up @@ -282,15 +294,13 @@ public void indexResultException(
* for search.
*
* @param configId Config id
* @param taskState task state
* @param overrideIntervalMinutes config interval in minutes
* @param error Error
* @param clock Clock to get current time
* @param listener Callback to return whether any result has been generated in the past two intervals.
*/
private void hasRecentResult(
String configId,
String taskState,
Long overrideIntervalMinutes,
String error,
Clock clock,
Expand Down
39 changes: 31 additions & 8 deletions src/main/java/org/opensearch/timeseries/ProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.timeseries.model.InitProgressProfile;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.model.Job;
import org.opensearch.timeseries.model.ModelProfileOnNode;
import org.opensearch.timeseries.model.ProfileName;
import org.opensearch.timeseries.model.TaskType;
import org.opensearch.timeseries.model.TimeSeriesTask;
Expand All @@ -66,6 +67,7 @@

public abstract class ProfileRunner<TaskCacheManagerType extends TaskCacheManager, TaskTypeEnum extends TaskType, TaskClass extends TimeSeriesTask, IndexType extends Enum<IndexType> & TimeSeriesIndex, IndexManagementType extends IndexManagement<IndexType>, TaskProfileType extends TaskProfile<TaskClass>, TaskManagerType extends TaskManager<TaskCacheManagerType, TaskTypeEnum, TaskClass, IndexType, IndexManagementType>, ConfigProfileType extends ConfigProfile<TaskClass, TaskProfileType>, ProfileActionType extends ActionType<ProfileResponse>, TaskProfileRunnerType extends TaskProfileRunner<TaskClass, TaskProfileType>>
extends AbstractProfileRunner {

private final Logger logger = LogManager.getLogger(ProfileRunner.class);
protected Client client;
protected SecurityClientUtil clientUtil;
Expand Down Expand Up @@ -378,8 +380,31 @@ protected void profileModels(
) {
DiscoveryNode[] dataNodes = nodeFilter.getEligibleDataNodes();
ProfileRequest profileRequest = new ProfileRequest(config.getId(), profiles, dataNodes);
client.execute(profileAction, profileRequest, onModelResponse(config, profiles, job, listener));// get init
// progress
if (config.isLongInterval()) {
ConfigProfileType.Builder<TaskClass, TaskProfileType> profile = createProfileBuilder();
if (profiles.contains(ProfileName.COORDINATING_NODE)) {
profile.coordinatingNode("");
}
if (profiles.contains(ProfileName.TOTAL_SIZE_IN_BYTES)) {
profile.totalSizeInBytes(0L);
}
if (profiles.contains(ProfileName.MODELS)) {
profile.modelProfile(new ModelProfileOnNode[0]);
profile.modelCount(0);
}
if (config.isHighCardinality() && profiles.contains(ProfileName.ACTIVE_ENTITIES)) {
profile.activeEntities(0L);
}

// only need to do it for models in priority cache. Single stream and HC are the same logic
if (profiles.contains(ProfileName.INIT_PROGRESS) || profiles.contains(ProfileName.STATE)) {
profileStateRelated(job, profiles, 0L, profile, config, listener);
} else {
listener.onResponse(profile.build());
}
} else {
client.execute(profileAction, profileRequest, onModelResponse(config, profiles, job, listener));
}
}

private ActionListener<ProfileResponse> onModelResponse(
Expand All @@ -405,10 +430,9 @@ private ActionListener<ProfileResponse> onModelResponse(
profile.activeEntities(profileResponse.getActiveEntities());
}

// only need to do it for models in priority cache. AD single stream analysis has a
// different workflow to determine state and init progress
// only need to do it for models in priority cache. Single stream and HC are the same logic.
if (profilesToCollect.contains(ProfileName.INIT_PROGRESS) || profilesToCollect.contains(ProfileName.STATE)) {
profileStateRelated(job, profilesToCollect, profileResponse, profile, config, listener);
profileStateRelated(job, profilesToCollect, profileResponse.getTotalUpdates(), profile, config, listener);
} else {
listener.onResponse(profile.build());
}
Expand All @@ -418,18 +442,17 @@ private ActionListener<ProfileResponse> onModelResponse(
private void profileStateRelated(
Job job,
Set<ProfileName> profilesToCollect,
ProfileResponse profileResponse,
Long totalUpdates,
ConfigProfileType.Builder<TaskClass, TaskProfileType> profileBuilder,
Config config,
MultiResponsesDelegateActionListener<ConfigProfileType> listener
) {
if (job.isEnabled()) {
if (profileResponse.getTotalUpdates() < requiredSamples) {
if (totalUpdates < requiredSamples) {
// need to double check for an HC analysis
// since what ProfileResponse returns is the highest priority entity currently in memory, but
// another entity might have already been initialized and sit somewhere else (in memory or on disk).
long enabledTime = job.getEnabledTime().toEpochMilli();
long totalUpdates = profileResponse.getTotalUpdates();
ProfileUtil
.confirmRealtimeResultStatus(
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1229,14 +1229,14 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
.put(StatNames.FORECASTER_COUNT.getName(), new TimeSeriesStat<>(true, new SettableSupplier()))
.put(StatNames.SINGLE_STREAM_FORECASTER_COUNT.getName(), new TimeSeriesStat<>(true, new SettableSupplier()))
.put(StatNames.HC_FORECASTER_COUNT.getName(), new TimeSeriesStat<>(true, new SettableSupplier()))
.put(StatNames.FORECAST_MODEL_CORRUTPION_COUNT.getName(), new TimeSeriesStat<>(false, new CounterSupplier()))
.put(StatNames.FORECAST_MODEL_CORRUPTION_COUNT.getName(), new TimeSeriesStat<>(false, new CounterSupplier()))
.put(
StatNames.MODEL_INFORMATION.getName(),
new TimeSeriesStat<>(false, new ForecastModelsOnNodeSupplier(forecastCacheProvider, settings, clusterService))
)
.put(
StatNames.AD_CONFIG_INDEX_STATUS.getName(),
new TimeSeriesStat<>(true, new IndexStatusSupplier(indexUtils, ADCommonName.CONFIG_INDEX))
StatNames.FORECAST_CONFIG_INDEX_STATUS.getName(),
new TimeSeriesStat<>(true, new IndexStatusSupplier(indexUtils, ForecastCommonName.CONFIG_INDEX))
)
.put(
StatNames.JOB_INDEX_STATUS.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ private Optional<ModelState<RCFModelType>> getStateFromInactiveEntiiyCache(Strin
@Override
public boolean hostIfPossible(Config config, ModelState<RCFModelType> toUpdate) {
// Although toUpdate may not have samples or model, we'll continue.
if (toUpdate == null) {
// larger than 1hr interval, don't cache
if (toUpdate == null || config.isLongInterval()) {
return false;
}
String modelId = toUpdate.getModelId();
Expand Down Expand Up @@ -350,6 +351,11 @@ public Pair<List<Entity>, List<Entity>> selectUpdateCandidate(Collection<Entity>
List<Entity> hotEntities = new ArrayList<>();
List<Entity> coldEntities = new ArrayList<>();

if (config.isLongInterval()) {
// put long interval entities in cold queue as we don't want to cache it
return Pair.of(hotEntities, new ArrayList<>(cacheMissEntities));
}

CacheBufferType buffer = activeEnities.get(configId);
if (buffer == null) {
// when a config is just started or during run once, there is
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/opensearch/timeseries/model/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -903,4 +903,8 @@ protected static Boolean onlyParseBooleanValue(XContentParser parser) throws IOE
protected int getMinimumShingle() {
return 1;
}

public boolean isLongInterval() {
return getIntervalDuration().compareTo(TimeSeriesSettings.HOURLY_MAINTENANCE) >= 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,10 @@ protected ActionListener<Optional<? extends Config>> processIterationUsingConfig
boolean loaded = cacheProvider.get().hostIfPossible(config, restoredModelState);

if (false == loaded) {
// not in memory. Maybe cold entities or some other entities
// have filled the slot while waiting for loading checkpoints.
checkpointWriteWorker.write(restoredModelState, true, RequestPriority.LOW);
// not in memory. Maybe cold entities or long interval entities
// Save checkpoints.
checkpointWriteWorker
.write(restoredModelState, true, config.isLongInterval() ? RequestPriority.MEDIUM : RequestPriority.LOW);
}
}

Expand Down
Loading
Loading