Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Upgrade mapping (#278)
Browse files Browse the repository at this point in the history
* Upgrade mapping

This PR updates the current AD index mapping if a new version is detected.  Previously, we didn't do so.  This will cause issues when upgrading AD binary version: newly added fields will not be searchable, though we can still send get requests to fetch them.

Testing done:
1. manual upgrade testing passes with fgac on and off.
2. will add unit tests later.
  • Loading branch information
kaituo authored Oct 20, 2020
1 parent 2e7e39d commit 8def1ba
Show file tree
Hide file tree
Showing 27 changed files with 570 additions and 51 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ List<String> jacocoExclusions = [
'com.amazon.opendistroforelasticsearch.ad.transport.EntityProfileRequest',
'com.amazon.opendistroforelasticsearch.ad.util.MultiResponsesDelegateActionListener',
'com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorRequest',
'com.amazon.opendistroforelasticsearch.ad.util.ThrowingSupplierWrapper',
]

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import com.amazon.opendistroforelasticsearch.ad.common.exception.AnomalyDetectionException;
import com.amazon.opendistroforelasticsearch.ad.common.exception.EndRunException;
import com.amazon.opendistroforelasticsearch.ad.common.exception.InternalFailure;
import com.amazon.opendistroforelasticsearch.ad.indices.ADIndex;
import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult;
import com.amazon.opendistroforelasticsearch.ad.model.FeatureData;
Expand Down Expand Up @@ -82,6 +84,7 @@ public class AnomalyDetectorJobRunner implements ScheduledJobRunner {
private AnomalyIndexHandler<AnomalyResult> anomalyResultHandler;
private ConcurrentHashMap<String, Integer> detectorEndRunExceptionCount;
private DetectionStateHandler detectionStateHandler;
private AnomalyDetectionIndices indexUtil;

public static AnomalyDetectorJobRunner getJobRunnerInstance() {
if (INSTANCE != null) {
Expand Down Expand Up @@ -126,6 +129,10 @@ public void setDetectionStateHandler(DetectionStateHandler detectionStateHandler
this.detectionStateHandler = detectionStateHandler;
}

public void setIndexUtil(AnomalyDetectionIndices indexUtil) {
this.indexUtil = indexUtil;
}

@Override
public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) {
String detectorId = jobParameter.getName();
Expand Down Expand Up @@ -202,6 +209,7 @@ protected void runAdJob(
}

try {
indexUtil.updateMappingIfNecessary();
AnomalyResultRequest request = new AnomalyResultRequest(
detectorId,
detectionStartTime.toEpochMilli(),
Expand Down Expand Up @@ -451,7 +459,8 @@ private void indexAnomalyResult(
executionStartTime,
Instant.now(),
response.getError(),
user
user,
indexUtil.getSchemaVersion(ADIndex.RESULT)
);
anomalyResultHandler.index(anomalyResult, detectorId);
detectionStateHandler.saveError(response.getError(), detectorId);
Expand Down Expand Up @@ -508,7 +517,8 @@ private void indexAnomalyResultException(
executionStartTime,
Instant.now(),
errorMessage,
user
user,
indexUtil.getSchemaVersion(ADIndex.RESULT)
);
anomalyResultHandler.index(anomalyResult, detectorId);
detectionStateHandler.saveError(errorMessage, detectorId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,10 @@ public List<RestHandler> getRestHandlers(
jobRunner.setAnomalyResultHandler(anomalyResultHandler);
jobRunner.setDetectionStateHandler(detectorStateHandler);
jobRunner.setSettings(settings);
jobRunner.setIndexUtil(anomalyDetectionIndices);

RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction();
RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(
settings,
clusterService,
anomalyDetectionIndices
);
RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(settings, clusterService);
RestSearchAnomalyDetectorAction searchAnomalyDetectorAction = new RestSearchAnomalyDetectorAction();
RestSearchAnomalyResultAction searchAnomalyResultAction = new RestSearchAnomalyResultAction();
RestDeleteAnomalyDetectorAction deleteAnomalyDetectorAction = new RestDeleteAnomalyDetectorAction();
Expand Down Expand Up @@ -432,7 +429,7 @@ public Collection<Object> createComponents(
AnomalyDetectorSettings.NUM_MIN_SAMPLES,
settings,
threadPool,
AnomalyDetectorSettings.MAX_CACHE_HANDLING_PER_SECOND
AnomalyDetectorSettings.MAX_CACHE_MISS_HANDLING_PER_SECOND.get(settings)
);

CacheProvider cacheProvider = new CacheProvider(cache);
Expand Down Expand Up @@ -582,7 +579,8 @@ public List<Setting<?>> getSettings() {
AnomalyDetectorSettings.MAX_ENTITIES_FOR_PREVIEW,
AnomalyDetectorSettings.INDEX_PRESSURE_SOFT_LIMIT,
AnomalyDetectorSettings.MAX_PRIMARY_SHARDS,
AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES
AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES,
AnomalyDetectorSettings.MAX_CACHE_MISS_HANDLING_PER_SECOND
);
return unmodifiableList(Stream.concat(enabledSetting.stream(), systemSetting.stream()).collect(Collectors.toList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;

import com.amazon.opendistroforelasticsearch.ad.constant.CommonValue;
import com.amazon.opendistroforelasticsearch.ad.feature.FeatureManager;
import com.amazon.opendistroforelasticsearch.ad.feature.Features;
import com.amazon.opendistroforelasticsearch.ad.ml.ModelManager;
Expand Down Expand Up @@ -158,7 +159,8 @@ private List<AnomalyResult> parsePreviewResult(
null,
null,
entity,
detector.getUser()
detector.getUser(),
CommonValue.NO_SCHEMA_VERSION
);
} else {
result = new AnomalyResult(
Expand All @@ -173,7 +175,8 @@ private List<AnomalyResult> parsePreviewResult(
null,
null,
entity,
detector.getUser()
detector.getUser(),
CommonValue.NO_SCHEMA_VERSION
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.amazon.opendistroforelasticsearch.ad.caching;

import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.COOLDOWN_MINUTES;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.MAX_CACHE_MISS_HANDLING_PER_SECOND;

import java.time.Clock;
import java.time.Duration;
Expand Down Expand Up @@ -82,7 +83,7 @@ public class PriorityCache implements EntityCache {
private int coolDownMinutes;
private ThreadPool threadPool;
private Random random;
private final RateLimiter cacheMissHandlingLimiter;
private RateLimiter cacheMissHandlingLimiter;

public PriorityCache(
CheckpointDao checkpointDao,
Expand Down Expand Up @@ -125,6 +126,9 @@ public PriorityCache(
this.random = new Random(42);

this.cacheMissHandlingLimiter = RateLimiter.create(cacheMissRateHandlingLimiter);
clusterService
.getClusterSettings()
.addSettingsUpdateConsumer(MAX_CACHE_MISS_HANDLING_PER_SECOND, it -> this.cacheMissHandlingLimiter = RateLimiter.create(it));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,6 @@ public class CommonName {
public static final String IP_TYPE = "ip";

public static final String TOTAL_UPDATES = "total_updates";

public static final String SCHEMA_VERSION_FIELD = "schema_version";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.constant;

public class CommonValue {
// unknown or no schema version
public static Integer NO_SCHEMA_VERSION = 0;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.indices;

import java.util.function.Supplier;

import com.amazon.opendistroforelasticsearch.ad.constant.CommonName;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.model.DetectorInternalState;
import com.amazon.opendistroforelasticsearch.ad.util.ThrowingSupplierWrapper;

/**
* Represent an AD index
*
*/
public enum ADIndex {

// throw RuntimeException since we don't know how to handle the case when the mapping reading throws IOException
RESULT(
CommonName.ANOMALY_RESULT_INDEX_ALIAS,
true,
ThrowingSupplierWrapper.throwingSupplierWrapper(AnomalyDetectionIndices::getAnomalyResultMappings)
),
CONFIG(
AnomalyDetector.ANOMALY_DETECTORS_INDEX,
false,
ThrowingSupplierWrapper.throwingSupplierWrapper(AnomalyDetectionIndices::getAnomalyDetectorMappings)
),
JOB(
AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX,
false,
ThrowingSupplierWrapper.throwingSupplierWrapper(AnomalyDetectionIndices::getAnomalyDetectorJobMappings)
),
CHECKPOINT(
CommonName.CHECKPOINT_INDEX_NAME,
false,
ThrowingSupplierWrapper.throwingSupplierWrapper(AnomalyDetectionIndices::getCheckpointMappings)
),
STATE(
DetectorInternalState.DETECTOR_STATE_INDEX,
false,
ThrowingSupplierWrapper.throwingSupplierWrapper(AnomalyDetectionIndices::getDetectorStateMappings)
);

private final String indexName;
// whether we use an alias for the index
private final boolean alias;
private final String mapping;

ADIndex(String name, boolean alias, Supplier<String> mappingSupplier) {
this.indexName = name;
this.alias = alias;
this.mapping = mappingSupplier.get();
}

public String getIndexName() {
return indexName;
}

public boolean isAlias() {
return alias;
}

public String getMapping() {
return mapping;
}

}
Loading

0 comments on commit 8def1ba

Please sign in to comment.