Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
### Features
### Enhancements
- Use Centralized Resource Access Control framework provided by security plugin ([#1400](https://github.com/opensearch-project/anomaly-detection/pull/1400))
- Introduce state machine, separate config index, improve suggest/validate APIs, and persist cold-start results for run-once visualization ([#1479](https://github.com/opensearch-project/anomaly-detection/pull/1479))

### Bug Fixes
### Infrastructure
### Documentation
Expand Down
10 changes: 4 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ dependencies {
implementation group: 'com.yahoo.datasketches', name: 'memory', version: '0.12.2'
implementation group: 'commons-lang', name: 'commons-lang', version: '2.6'
implementation group: 'org.apache.commons', name: 'commons-pool2', version: '2.12.0'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:4.2.0'
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:4.2.0'
implementation 'software.amazon.randomcutforest:randomcutforest-core:4.2.0'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:4.3.0'
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:4.3.0'
implementation 'software.amazon.randomcutforest:randomcutforest-core:4.3.0'

// we inherit jackson-core from opensearch core
implementation("com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}")
Expand Down Expand Up @@ -188,6 +188,7 @@ dependencies {

opensearchPlugin "org.opensearch.plugin:opensearch-job-scheduler:${opensearch_build}@zip"
opensearchPlugin "org.opensearch.plugin:opensearch-security:${opensearch_build}@zip"
testImplementation 'org.reflections:reflections:0.10.2'
}

apply plugin: 'java'
Expand Down Expand Up @@ -998,9 +999,6 @@ List<String> jacocoExclusions = [

// TODO: add test coverage (kaituo)
'org.opensearch.forecast.*',
'org.opensearch.timeseries.ml.Sample',
'org.opensearch.timeseries.ratelimit.FeatureRequest',
'org.opensearch.ad.transport.ADHCImputeNodeRequest',
'org.opensearch.timeseries.model.ModelProfileOnNode',
'org.opensearch.timeseries.transport.ValidateConfigRequest',
'org.opensearch.timeseries.transport.ResultProcessor.PageListener.1',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public ADEntityProfileRunner(
AnalysisType.AD,
ADEntityProfileAction.INSTANCE,
ADCommonName.ANOMALY_RESULT_INDEX_ALIAS,
AnomalyResult.DETECTOR_ID_FIELD
AnomalyResult.DETECTOR_ID_FIELD,
ADCommonName.CONFIG_INDEX
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.model.ADTask;
Expand Down Expand Up @@ -71,7 +72,8 @@ public AnomalyDetectorProfileRunner(
ProfileName.AD_TASK,
ADProfileAction.INSTANCE,
AnomalyDetector::parse,
taskProfileRunner
taskProfileRunner,
ADCommonName.CONFIG_INDEX
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package org.opensearch.ad;

import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Optional;
Expand Down Expand Up @@ -50,7 +51,7 @@ public ExecuteADResultResponseRecorder(
ThreadPool threadPool,
Client client,
NodeStateManager nodeStateManager,
ADTaskCacheManager taskCacheManager,
Clock clock,
int rcfMinSamples
) {
super(
Expand All @@ -62,8 +63,7 @@ public ExecuteADResultResponseRecorder(
TimeSeriesAnalyticsPlugin.AD_THREAD_POOL_NAME,
client,
nodeStateManager,
taskCacheManager,
rcfMinSamples,
clock,
ADIndex.RESULT,
AnalysisType.AD,
ADProfileAction.INSTANCE
Expand Down Expand Up @@ -104,14 +104,15 @@ protected AnomalyResult createErrorResult(
* Instead, we issue a profile request to poll each model node and get the maximum total updates among all models.
* @param response response returned from executing AnomalyResultAction
* @param configId config Id
* @param clock Clock to get current time
*/
@Override
protected void updateRealtimeTask(ResultResponse<AnomalyResult> response, String configId) {
protected void updateRealtimeTask(ResultResponse<AnomalyResult> response, String configId, Clock clock) {
if (response.isHC() != null && response.isHC()) {
if (taskManager.skipUpdateRealtimeTask(configId, response.getError())) {
return;
}
delayedUpdate(response, configId);
delayedUpdate(response, configId, clock);
} else {
log
.debug(
Expand All @@ -124,7 +125,8 @@ protected void updateRealtimeTask(ResultResponse<AnomalyResult> response, String
null,
response.getRcfTotalUpdates(),
response.getConfigIntervalInMinutes(),
response.getError()
response.getError(),
clock
);
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/opensearch/ad/constant/ADCommonName.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public class ADCommonName {
public static final String CHECKPOINT_INDEX_NAME = ".opendistro-anomaly-checkpoints";
// index name for anomaly detection state. Will store AD task in this index as well.
public static final String DETECTION_STATE_INDEX = ".opendistro-anomaly-detection-state";
// config index. We are reusing ad detector index.
public static final String CONFIG_INDEX = ".opendistro-anomaly-detectors";

// The alias of the index in which to write AD result history
public static final String ANOMALY_RESULT_INDEX_ALIAS = ".opendistro-anomaly-results";
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/org/opensearch/ad/indices/ADIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public enum ADIndex implements TimeSeriesIndex {
true,
ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getResultMappings)
),
CONFIG(CommonName.CONFIG_INDEX, false, ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getConfigMappings)),
CONFIG(ADCommonName.CONFIG_INDEX, false, ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getConfigMappings)),
JOB(CommonName.JOB_INDEX, false, ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getJobMappings)),
CHECKPOINT(
ADCommonName.CHECKPOINT_INDEX_NAME,
Expand Down Expand Up @@ -65,4 +65,9 @@ public boolean isAlias() {
public String getMapping() {
return mapping;
}

@Override
public boolean isConfigIndex() {
return ADCommonName.CONFIG_INDEX.equals(getIndexName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public ADIndexManagement(
ADIndex.RESULT.getMapping(),
xContentRegistry,
AnomalyDetector::parse,
ADCommonName.CUSTOM_RESULT_INDEX_PREFIX
ADCommonName.CUSTOM_RESULT_INDEX_PREFIX,
ADCommonName.CONFIG_INDEX
);

this.indexStates = new EnumMap<ADIndex, IndexState>(ADIndex.class);
Expand Down
90 changes: 81 additions & 9 deletions src/main/java/org/opensearch/ad/ml/ADColdStart.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.ml.IgnoreSimilarExtractor.ThresholdArrays;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.ratelimit.ADCheckpointWriteWorker;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AnalysisType;
Expand All @@ -34,18 +38,21 @@
import org.opensearch.timeseries.model.Config;
import org.opensearch.timeseries.ratelimit.RequestPriority;
import org.opensearch.timeseries.settings.TimeSeriesSettings;
import org.opensearch.timeseries.util.ModelUtil;
import org.opensearch.timeseries.util.ParseUtils;

import com.amazon.randomcutforest.config.ForestMode;
import com.amazon.randomcutforest.config.Precision;
import com.amazon.randomcutforest.config.TransformMethod;
import com.amazon.randomcutforest.parkservices.AnomalyDescriptor;
import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

/**
* Training models for HCAD detectors
*
*/
public class ADColdStart extends
ModelColdStart<ThresholdedRandomCutForest, ADIndex, ADIndexManagement, ADCheckpointDao, ADCheckpointWriteWorker> {
ModelColdStart<ThresholdedRandomCutForest, ADIndex, ADIndexManagement, ADCheckpointDao, ADCheckpointWriteWorker, AnomalyResult> {
private static final Logger logger = LogManager.getLogger(ADColdStart.class);

/**
Expand Down Expand Up @@ -87,7 +94,8 @@ public ADColdStart(
ADCheckpointWriteWorker checkpointWriteWorker,
long rcfSeed,
int maxRoundofColdStart,
int coolDownMinutes
int coolDownMinutes,
int resultSchemaVersion
) {
super(
modelTtl,
Expand All @@ -107,7 +115,8 @@ public ADColdStart(
featureManager,
maxRoundofColdStart,
TimeSeriesAnalyticsPlugin.AD_THREAD_POOL_NAME,
AnalysisType.AD
AnalysisType.AD,
resultSchemaVersion
);
}

Expand All @@ -126,7 +135,8 @@ public ADColdStart(
Duration modelTtl,
ADCheckpointWriteWorker checkpointWriteQueue,
int maxRoundofColdStart,
int coolDownMinutes
int coolDownMinutes,
int resultSchemaVersion
) {
this(
clock,
Expand All @@ -144,7 +154,8 @@ public ADColdStart(
checkpointWriteQueue,
-1,
maxRoundofColdStart,
coolDownMinutes
coolDownMinutes,
resultSchemaVersion
);
}

Expand All @@ -158,7 +169,7 @@ public ADColdStart(
* training data in result index so that the frontend can plot it.
*/
@Override
protected List<Sample> trainModelFromDataSegments(
protected List<AnomalyResult> trainModelFromDataSegments(
List<Sample> pointSamples,
ModelState<ThresholdedRandomCutForest> entityState,
Config config,
Expand All @@ -185,6 +196,7 @@ protected List<Sample> trainModelFromDataSegments(
.numberOfTrees(numberOfTrees)
.timeDecay(config.getTimeDecay())
.transformDecay(config.getTimeDecay())
// allow enough samples before emitting scores to park service
.outputAfter(Math.max(shingleSize, numMinSamples))
.initialAcceptFraction(initialAcceptFraction)
.parallelExecutionEnabled(false)
Expand Down Expand Up @@ -221,11 +233,71 @@ protected List<Sample> trainModelFromDataSegments(
// use build instead of new TRCF(Builder) because build method did extra validation and initialization
ThresholdedRandomCutForest trcf = rcfBuilder.build();

// Prepare for sequential processing
double[][] sequentialData = new double[pointSamples.size()][];
long[] timestamps = new long[pointSamples.size()];
List<Pair<Instant, Instant>> sequentialTime = new ArrayList<>();

// Convert the list of Sample objects into a 2D array + a parallel list of time pairs
for (int i = 0; i < pointSamples.size(); i++) {
Sample dataSample = pointSamples.get(i);
double[] dataValue = dataSample.getValueList();
// We don't keep missing values during cold start as the actual data may not be reconstructed during the early stage.
trcf.process(dataValue, dataSample.getDataEndTime().getEpochSecond());

sequentialData[i] = dataValue;
timestamps[i] = dataSample.getDataEndTime().getEpochSecond();
// Store start and end times together
sequentialTime.add(Pair.of(dataSample.getDataStartTime(), dataSample.getDataEndTime()));
}

// Process data in one go. We need the timestamps for STREAMING_IMPUTE mode
final List<AnomalyDescriptor> descriptors;
try {
descriptors = trcf.processSequentially(sequentialData, timestamps, x -> true);
} catch (Exception e) {
// e.g., out of order timestamps
logger.error("Error while running processSequentially", e);
// abort and return no results if the sequence processing fails
return null;
}
// Check for size mismatch
if (descriptors.size() != sequentialTime.size()) {
logger
.warn(
"processSequentially returned a different size than expected: got [{}], expected [{}].",
descriptors.size(),
sequentialTime.size()
);
return null;
}

// Build anomaly results from sequential descriptors
List<AnomalyResult> results = new ArrayList<>();
for (int i = 0; i < descriptors.size(); i++) {
AnomalyDescriptor descriptor = descriptors.get(i);
double[] dataValue = sequentialData[i];
Pair<Instant, Instant> time = sequentialTime.get(i);

// Convert the descriptor into a thresholding result, or anomaly result
ThresholdingResult thresholdingResult = ModelUtil.toResult(trcf.getForest(), descriptor, dataValue, false, config);

Instant now = Instant.now();
results
.addAll(
thresholdingResult
.toIndexableResults(
config,
time.getLeft(), // Data start time
time.getRight(), // Data end time
now,
now,
ParseUtils.getFeatureData(dataValue, config),
entityState.getEntity(),
resultMappingVersion,
entityState.getModelId(),
taskId,
null
)
);
}

entityState.setModel(trcf);
Expand All @@ -235,7 +307,7 @@ protected List<Sample> trainModelFromDataSegments(
// save to checkpoint
checkpointWriteWorker.write(entityState, true, RequestPriority.MEDIUM);

return pointSamples;
return results;
}

public static void applyRule(ThresholdedRandomCutForest.Builder rcfBuilder, AnomalyDetector detector) {
Expand Down
Loading
Loading