From 8def1ba1f68d73680bc543cfc1d3f3eea21c65f5 Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Mon, 19 Oct 2020 22:52:11 -0700 Subject: [PATCH] Upgrade mapping (#278) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Upgrade mapping This PR updates the current AD index mapping if a new version is detected.  Previously, we didn't do so.  This will cause issues when upgrading AD binary version: newly added fields will not be searchable, though we can still send get requests to fetch them. Testing done: 1. manual upgrade testing passes with fgac on and off. 2. will add unit tests later. --- build.gradle | 1 + .../ad/AnomalyDetectorJobRunner.java | 14 +- .../ad/AnomalyDetectorPlugin.java | 12 +- .../ad/AnomalyDetectorRunner.java | 7 +- .../ad/caching/PriorityCache.java | 6 +- .../ad/constant/CommonName.java | 2 + .../ad/constant/CommonValue.java | 21 ++ .../ad/indices/ADIndex.java | 82 +++++ .../ad/indices/AnomalyDetectionIndices.java | 288 +++++++++++++++++- .../ad/ml/CheckpointDao.java | 2 + .../ad/model/AnomalyDetector.java | 9 +- .../ad/model/AnomalyResult.java | 25 +- .../rest/RestIndexAnomalyDetectorAction.java | 7 +- .../ad/settings/AnomalyDetectorSettings.java | 10 +- .../EntityResultTransportAction.java | 10 +- .../IndexAnomalyDetectorTransportAction.java | 1 + .../ad/util/ThrowingConsumerWrapper.java | 2 +- .../ad/util/ThrowingSupplier.java | 27 ++ .../ad/util/ThrowingSupplierWrapper.java | 41 +++ .../mappings/anomaly-detector-jobs.json | 2 +- .../resources/mappings/anomaly-detectors.json | 2 +- .../resources/mappings/anomaly-results.json | 5 +- src/main/resources/mappings/checkpoint.json | 16 +- .../ad/AnomalyDetectorJobRunnerTests.java | 5 + .../ad/TestHelpers.java | 7 +- .../ad/caching/PriorityCacheTests.java | 9 + .../EntityResultTransportActionTests.java | 8 +- 27 files changed, 570 insertions(+), 51 deletions(-) create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonValue.java create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/ad/indices/ADIndex.java create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ThrowingSupplier.java create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ThrowingSupplierWrapper.java diff --git a/build.gradle b/build.gradle index e4256f7d..d2aa92b7 100644 --- a/build.gradle +++ b/build.gradle @@ -272,6 +272,7 @@ List jacocoExclusions = [ 'com.amazon.opendistroforelasticsearch.ad.transport.EntityProfileRequest', 'com.amazon.opendistroforelasticsearch.ad.util.MultiResponsesDelegateActionListener', 'com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorRequest', + 'com.amazon.opendistroforelasticsearch.ad.util.ThrowingSupplierWrapper', ] jacocoTestCoverageVerification { diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorJobRunner.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorJobRunner.java index 389fe5c6..ae4d21c5 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorJobRunner.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorJobRunner.java @@ -47,6 +47,8 @@ import com.amazon.opendistroforelasticsearch.ad.common.exception.AnomalyDetectionException; import com.amazon.opendistroforelasticsearch.ad.common.exception.EndRunException; import com.amazon.opendistroforelasticsearch.ad.common.exception.InternalFailure; +import com.amazon.opendistroforelasticsearch.ad.indices.ADIndex; +import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult; import com.amazon.opendistroforelasticsearch.ad.model.FeatureData; @@ -82,6 +84,7 @@ public class AnomalyDetectorJobRunner implements ScheduledJobRunner { private AnomalyIndexHandler anomalyResultHandler; private ConcurrentHashMap detectorEndRunExceptionCount; private DetectionStateHandler detectionStateHandler; + private AnomalyDetectionIndices indexUtil; public static AnomalyDetectorJobRunner getJobRunnerInstance() { if (INSTANCE != null) { @@ -126,6 +129,10 @@ public void setDetectionStateHandler(DetectionStateHandler detectionStateHandler this.detectionStateHandler = detectionStateHandler; } + public void setIndexUtil(AnomalyDetectionIndices indexUtil) { + this.indexUtil = indexUtil; + } + @Override public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) { String detectorId = jobParameter.getName(); @@ -202,6 +209,7 @@ protected void runAdJob( } try { + indexUtil.updateMappingIfNecessary(); AnomalyResultRequest request = new AnomalyResultRequest( detectorId, detectionStartTime.toEpochMilli(), @@ -451,7 +459,8 @@ private void indexAnomalyResult( executionStartTime, Instant.now(), response.getError(), - user + user, + indexUtil.getSchemaVersion(ADIndex.RESULT) ); anomalyResultHandler.index(anomalyResult, detectorId); detectionStateHandler.saveError(response.getError(), detectorId); @@ -508,7 +517,8 @@ private void indexAnomalyResultException( executionStartTime, Instant.now(), errorMessage, - user + user, + indexUtil.getSchemaVersion(ADIndex.RESULT) ); anomalyResultHandler.index(anomalyResult, detectorId); detectionStateHandler.saveError(errorMessage, detectorId); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java index 32f7eca3..a9dbcf0a 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java @@ -227,13 +227,10 @@ public List getRestHandlers( jobRunner.setAnomalyResultHandler(anomalyResultHandler); jobRunner.setDetectionStateHandler(detectorStateHandler); jobRunner.setSettings(settings); + jobRunner.setIndexUtil(anomalyDetectionIndices); RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction(); - RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction( - settings, - clusterService, - anomalyDetectionIndices - ); + RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(settings, clusterService); RestSearchAnomalyDetectorAction searchAnomalyDetectorAction = new RestSearchAnomalyDetectorAction(); RestSearchAnomalyResultAction searchAnomalyResultAction = new RestSearchAnomalyResultAction(); RestDeleteAnomalyDetectorAction deleteAnomalyDetectorAction = new RestDeleteAnomalyDetectorAction(); @@ -432,7 +429,7 @@ public Collection createComponents( AnomalyDetectorSettings.NUM_MIN_SAMPLES, settings, threadPool, - AnomalyDetectorSettings.MAX_CACHE_HANDLING_PER_SECOND + AnomalyDetectorSettings.MAX_CACHE_MISS_HANDLING_PER_SECOND.get(settings) ); CacheProvider cacheProvider = new CacheProvider(cache); @@ -582,7 +579,8 @@ public List> getSettings() { AnomalyDetectorSettings.MAX_ENTITIES_FOR_PREVIEW, AnomalyDetectorSettings.INDEX_PRESSURE_SOFT_LIMIT, AnomalyDetectorSettings.MAX_PRIMARY_SHARDS, - AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES + AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES, + AnomalyDetectorSettings.MAX_CACHE_MISS_HANDLING_PER_SECOND ); return unmodifiableList(Stream.concat(enabledSetting.stream(), systemSetting.stream()).collect(Collectors.toList())); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorRunner.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorRunner.java index e4fa92e6..9f5b3d5b 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorRunner.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorRunner.java @@ -28,6 +28,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonValue; import com.amazon.opendistroforelasticsearch.ad.feature.FeatureManager; import com.amazon.opendistroforelasticsearch.ad.feature.Features; import com.amazon.opendistroforelasticsearch.ad.ml.ModelManager; @@ -158,7 +159,8 @@ private List parsePreviewResult( null, null, entity, - detector.getUser() + detector.getUser(), + CommonValue.NO_SCHEMA_VERSION ); } else { result = new AnomalyResult( @@ -173,7 +175,8 @@ private List parsePreviewResult( null, null, entity, - detector.getUser() + detector.getUser(), + CommonValue.NO_SCHEMA_VERSION ); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/caching/PriorityCache.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/caching/PriorityCache.java index 8d541f3b..43f11aa9 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/caching/PriorityCache.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/caching/PriorityCache.java @@ -16,6 +16,7 @@ package com.amazon.opendistroforelasticsearch.ad.caching; import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.COOLDOWN_MINUTES; +import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.MAX_CACHE_MISS_HANDLING_PER_SECOND; import java.time.Clock; import java.time.Duration; @@ -82,7 +83,7 @@ public class PriorityCache implements EntityCache { private int coolDownMinutes; private ThreadPool threadPool; private Random random; - private final RateLimiter cacheMissHandlingLimiter; + private RateLimiter cacheMissHandlingLimiter; public PriorityCache( CheckpointDao checkpointDao, @@ -125,6 +126,9 @@ public PriorityCache( this.random = new Random(42); this.cacheMissHandlingLimiter = RateLimiter.create(cacheMissRateHandlingLimiter); + clusterService + .getClusterSettings() + .addSettingsUpdateConsumer(MAX_CACHE_MISS_HANDLING_PER_SECOND, it -> this.cacheMissHandlingLimiter = RateLimiter.create(it)); } @Override diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonName.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonName.java index 0892dcb3..0748a687 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonName.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonName.java @@ -73,4 +73,6 @@ public class CommonName { public static final String IP_TYPE = "ip"; public static final String TOTAL_UPDATES = "total_updates"; + + public static final String SCHEMA_VERSION_FIELD = "schema_version"; } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonValue.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonValue.java new file mode 100644 index 00000000..1ec69e73 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonValue.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.constant; + +public class CommonValue { + // unknown or no schema version + public static Integer NO_SCHEMA_VERSION = 0; +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/indices/ADIndex.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/indices/ADIndex.java new file mode 100644 index 00000000..f33c03c7 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/indices/ADIndex.java @@ -0,0 +1,82 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.indices; + +import java.util.function.Supplier; + +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob; +import com.amazon.opendistroforelasticsearch.ad.model.DetectorInternalState; +import com.amazon.opendistroforelasticsearch.ad.util.ThrowingSupplierWrapper; + +/** + * Represent an AD index + * + */ +public enum ADIndex { + + // throw RuntimeException since we don't know how to handle the case when the mapping reading throws IOException + RESULT( + CommonName.ANOMALY_RESULT_INDEX_ALIAS, + true, + ThrowingSupplierWrapper.throwingSupplierWrapper(AnomalyDetectionIndices::getAnomalyResultMappings) + ), + CONFIG( + AnomalyDetector.ANOMALY_DETECTORS_INDEX, + false, + ThrowingSupplierWrapper.throwingSupplierWrapper(AnomalyDetectionIndices::getAnomalyDetectorMappings) + ), + JOB( + AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX, + false, + ThrowingSupplierWrapper.throwingSupplierWrapper(AnomalyDetectionIndices::getAnomalyDetectorJobMappings) + ), + CHECKPOINT( + CommonName.CHECKPOINT_INDEX_NAME, + false, + ThrowingSupplierWrapper.throwingSupplierWrapper(AnomalyDetectionIndices::getCheckpointMappings) + ), + STATE( + DetectorInternalState.DETECTOR_STATE_INDEX, + false, + ThrowingSupplierWrapper.throwingSupplierWrapper(AnomalyDetectionIndices::getDetectorStateMappings) + ); + + private final String indexName; + // whether we use an alias for the index + private final boolean alias; + private final String mapping; + + ADIndex(String name, boolean alias, Supplier mappingSupplier) { + this.indexName = name; + this.alias = alias; + this.mapping = mappingSupplier.get(); + } + + public String getIndexName() { + return indexName; + } + + public boolean isAlias() { + return alias; + } + + public String getMapping() { + return mapping; + } + +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndices.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndices.java index 226cfcb1..7e2fbcb4 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndices.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndices.java @@ -28,9 +28,14 @@ import java.io.IOException; import java.net.URL; import java.time.Instant; +import java.util.ArrayList; import java.util.Arrays; +import java.util.EnumMap; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -38,19 +43,27 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; +import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.LocalNodeMasterListener; +import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParser.Token; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.threadpool.Scheduler; @@ -58,11 +71,13 @@ import com.amazon.opendistroforelasticsearch.ad.common.exception.EndRunException; import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonValue; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob; import com.amazon.opendistroforelasticsearch.ad.model.DetectorInternalState; import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer; import com.carrotsearch.hppc.cursors.ObjectCursor; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.google.common.base.Charsets; import com.google.common.io.Resources; @@ -78,6 +93,9 @@ public class AnomalyDetectionIndices implements LocalNodeMasterListener { // The index name pattern to query all AD result, history and current AD result public static final String ALL_AD_RESULTS_INDEX_PATTERN = ".opendistro-anomaly-results*"; + private static final String META = "_meta"; + private static final String SCHEMA_VERSION = "schema_version"; + private ClusterService clusterService; private final AdminClient adminClient; private final ThreadPool threadPool; @@ -90,6 +108,24 @@ public class AnomalyDetectionIndices implements LocalNodeMasterListener { private DiscoveryNodeFilterer nodeFilter; private int maxPrimaryShards; + // keep track of whether the mapping version is up-to-date + private EnumMap indexStates; + // whether all index have the correct mappings + private boolean allUpdated; + // we only want one update at a time + private final AtomicBoolean updateRunning; + + class IndexState { + // keep track of whether the mapping version is up-to-date + private Boolean updated; + // record schema version reading from the mapping file + private Integer schemaVersion; + + IndexState(ADIndex index) { + this.updated = false; + this.schemaVersion = parseSchemaVersion(index.getMapping()); + } + } /** * Constructor function @@ -118,6 +154,11 @@ public AnomalyDetectionIndices( this.nodeFilter = nodeFilter; + this.indexStates = new EnumMap(ADIndex.class); + + this.allUpdated = false; + this.updateRunning = new AtomicBoolean(false); + this.clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_RESULT_HISTORY_MAX_DOCS, it -> historyMaxDocs = it); this.clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_RESULT_HISTORY_ROLLOVER_PERIOD, it -> { @@ -137,7 +178,7 @@ public AnomalyDetectionIndices( * @return anomaly detector index mapping * @throws IOException IOException if mapping file can't be read correctly */ - private String getAnomalyDetectorMappings() throws IOException { + public static String getAnomalyDetectorMappings() throws IOException { URL url = AnomalyDetectionIndices.class.getClassLoader().getResource(ANOMALY_DETECTORS_INDEX_MAPPING_FILE); return Resources.toString(url, Charsets.UTF_8); } @@ -148,7 +189,7 @@ private String getAnomalyDetectorMappings() throws IOException { * @return anomaly result index mapping * @throws IOException IOException if mapping file can't be read correctly */ - private String getAnomalyResultMappings() throws IOException { + public static String getAnomalyResultMappings() throws IOException { URL url = AnomalyDetectionIndices.class.getClassLoader().getResource(ANOMALY_RESULTS_INDEX_MAPPING_FILE); return Resources.toString(url, Charsets.UTF_8); } @@ -159,7 +200,7 @@ private String getAnomalyResultMappings() throws IOException { * @return anomaly detector job index mapping * @throws IOException IOException if mapping file can't be read correctly */ - private String getAnomalyDetectorJobMappings() throws IOException { + public static String getAnomalyDetectorJobMappings() throws IOException { URL url = AnomalyDetectionIndices.class.getClassLoader().getResource(ANOMALY_DETECTOR_JOBS_INDEX_MAPPING_FILE); return Resources.toString(url, Charsets.UTF_8); } @@ -170,7 +211,7 @@ private String getAnomalyDetectorJobMappings() throws IOException { * @return anomaly detector state index mapping * @throws IOException IOException if mapping file can't be read correctly */ - private String getDetectorStateMappings() throws IOException { + public static String getDetectorStateMappings() throws IOException { URL url = AnomalyDetectionIndices.class.getClassLoader().getResource(ANOMALY_DETECTION_STATE_INDEX_MAPPING_FILE); return Resources.toString(url, Charsets.UTF_8); } @@ -181,7 +222,7 @@ private String getDetectorStateMappings() throws IOException { * @return checkpoint index mapping * @throws IOException IOException if mapping file can't be read correctly */ - private String getCheckpointMappings() throws IOException { + public static String getCheckpointMappings() throws IOException { URL url = AnomalyDetectionIndices.class.getClassLoader().getResource(CHECKPOINT_INDEX_MAPPING_FILE); return Resources.toString(url, Charsets.UTF_8); } @@ -231,6 +272,39 @@ public boolean doesCheckpointIndexExist() { return clusterService.state().getRoutingTable().hasIndex(CommonName.CHECKPOINT_INDEX_NAME); } + /** + * Index exists or not + * @param clusterServiceAccessor Cluster service + * @param name Index name + * @return true if the index exists + */ + public static boolean doesIndexExists(ClusterService clusterServiceAccessor, String name) { + return clusterServiceAccessor.state().getRoutingTable().hasIndex(name); + } + + /** + * Alias exists or not + * @param clusterServiceAccessor Cluster service + * @param alias Alias name + * @return true if the alias exists + */ + public static boolean doesAliasExists(ClusterService clusterServiceAccessor, String alias) { + return clusterServiceAccessor.state().metadata().hasAlias(alias); + } + + private ActionListener markMappingUpToDate(ADIndex index, ActionListener followingListener) { + return ActionListener.wrap(createdResponse -> { + if (createdResponse.isAcknowledged()) { + IndexState indexStatetate = indexStates.computeIfAbsent(index, IndexState::new); + if (Boolean.FALSE.equals(indexStatetate.updated)) { + indexStatetate.updated = Boolean.TRUE; + logger.info(new ParameterizedMessage("Mark [{}]'s mapping up-to-date", index.getIndexName())); + } + } + followingListener.onResponse(createdResponse); + }, exception -> followingListener.onFailure(exception)); + } + /** * Create anomaly detector index if not exist. * @@ -252,7 +326,7 @@ public void initAnomalyDetectorIndexIfAbsent(ActionListener public void initAnomalyDetectorIndex(ActionListener actionListener) throws IOException { CreateIndexRequest request = new CreateIndexRequest(AnomalyDetector.ANOMALY_DETECTORS_INDEX) .mapping(AnomalyDetector.TYPE, getAnomalyDetectorMappings(), XContentType.JSON); - adminClient.indices().create(request, actionListener); + adminClient.indices().create(request, markMappingUpToDate(ADIndex.CONFIG, actionListener)); } /** @@ -295,7 +369,7 @@ public void initAnomalyResultIndexDirectly(ActionListener a .mapping(CommonName.MAPPING_TYPE, mapping, XContentType.JSON) .alias(new Alias(CommonName.ANOMALY_RESULT_INDEX_ALIAS)); choosePrimaryShards(request); - adminClient.indices().create(request, actionListener); + adminClient.indices().create(request, markMappingUpToDate(ADIndex.RESULT, actionListener)); } /** @@ -309,7 +383,7 @@ public void initAnomalyDetectorJobIndex(ActionListener acti CreateIndexRequest request = new CreateIndexRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX) .mapping(AnomalyDetector.TYPE, getAnomalyDetectorJobMappings(), XContentType.JSON); choosePrimaryShards(request); - adminClient.indices().create(request, actionListener); + adminClient.indices().create(request, markMappingUpToDate(ADIndex.JOB, actionListener)); } /** @@ -321,7 +395,7 @@ public void initAnomalyDetectorJobIndex(ActionListener acti public void initDetectorStateIndex(ActionListener actionListener) throws IOException { CreateIndexRequest request = new CreateIndexRequest(DetectorInternalState.DETECTOR_STATE_INDEX) .mapping(AnomalyDetector.TYPE, getDetectorStateMappings(), XContentType.JSON); - adminClient.indices().create(request, actionListener); + adminClient.indices().create(request, markMappingUpToDate(ADIndex.STATE, actionListener)); } /** @@ -340,7 +414,7 @@ public void initCheckpointIndex(ActionListener actionListen CreateIndexRequest request = new CreateIndexRequest(CommonName.CHECKPOINT_INDEX_NAME) .mapping(CommonName.MAPPING_TYPE, mapping, XContentType.JSON); choosePrimaryShards(request); - adminClient.indices().create(request, actionListener); + adminClient.indices().create(request, markMappingUpToDate(ADIndex.CHECKPOINT, actionListener)); } @Override @@ -404,6 +478,8 @@ void rolloverAndDeleteHistoryIndex() { logger .warn("{} not rolled over. Conditions were: {}", CommonName.ANOMALY_RESULT_INDEX_ALIAS, response.getConditionStatus()); } else { + IndexState indexStatetate = indexStates.computeIfAbsent(ADIndex.RESULT, IndexState::new); + indexStatetate.updated = true; logger.info("{} rolled over. Conditions were: {}", CommonName.ANOMALY_RESULT_INDEX_ALIAS, response.getConditionStatus()); deleteOldHistoryIndices(); } @@ -477,4 +553,196 @@ private void deleteIndexIteration(String[] toDelete) { })); } } + + /** + * Update mapping if schema version changes. + */ + public void updateMappingIfNecessary() { + if (allUpdated || updateRunning.get()) { + return; + } + + updateRunning.set(true); + + List updates = new ArrayList<>(); + for (ADIndex index : ADIndex.values()) { + Boolean updated = indexStates.computeIfAbsent(index, IndexState::new).updated; + if (Boolean.FALSE.equals(updated)) { + updates.add(index); + } + } + if (updates.size() == 0) { + allUpdated = true; + updateRunning.set(false); + return; + } + + final GroupedActionListener conglomerateListeneer = new GroupedActionListener<>( + ActionListener.wrap(r -> updateRunning.set(false), exception -> logger.error("Fail to updatea all mappings")), + updates.size() + ); + + for (ADIndex adIndex : updates) { + logger.info(new ParameterizedMessage("Check [{}]'s mapping", adIndex.getIndexName())); + shouldUpdateIndex(adIndex, ActionListener.wrap(shouldUpdate -> { + if (shouldUpdate) { + adminClient + .indices() + .putMapping( + new PutMappingRequest() + .indices(adIndex.getIndexName()) + .type(CommonName.MAPPING_TYPE) + .source(adIndex.getMapping(), XContentType.JSON), + ActionListener.wrap(putMappingResponse -> { + if (putMappingResponse.isAcknowledged()) { + logger.info(new ParameterizedMessage("Succeeded in updating [{}]'s mapping", adIndex.getIndexName())); + markMappingUpdated(adIndex); + } else { + logger.error(new ParameterizedMessage("Fail to update [{}]'s mapping", adIndex.getIndexName())); + } + conglomerateListeneer.onResponse(null); + }, exception -> { + logger.error(new ParameterizedMessage("Fail to update [{}]'s mapping", adIndex.getIndexName()), exception); + conglomerateListeneer.onFailure(exception); + }) + ); + } else { + // index does not exist or the version is already up-to-date. + // When creating index, new mappings will be used. + // We don't need to update it. + logger.info(new ParameterizedMessage("We don't need to update [{}]'s mapping", adIndex.getIndexName())); + markMappingUpdated(adIndex); + conglomerateListeneer.onResponse(null); + } + }, exception -> { + logger + .error( + new ParameterizedMessage("Fail to check whether we should update [{}]'s mapping", adIndex.getIndexName()), + exception + ); + conglomerateListeneer.onFailure(exception); + })); + + } + } + + private void markMappingUpdated(ADIndex adIndex) { + IndexState indexState = indexStates.computeIfAbsent(adIndex, IndexState::new); + if (Boolean.FALSE.equals(indexState.updated)) { + indexState.updated = Boolean.TRUE; + logger.info(new ParameterizedMessage("Mark [{}]'s mapping up-to-date", adIndex.getIndexName())); + } + } + + private void shouldUpdateIndex(ADIndex index, ActionListener thenDo) { + boolean exists = false; + if (index.isAlias()) { + exists = AnomalyDetectionIndices.doesAliasExists(clusterService, index.getIndexName()); + } else { + exists = AnomalyDetectionIndices.doesIndexExists(clusterService, index.getIndexName()); + } + if (false == exists) { + thenDo.onResponse(Boolean.FALSE); + return; + } + + Integer newVersion = indexStates.computeIfAbsent(index, IndexState::new).schemaVersion; + if (index.isAlias()) { + GetAliasesRequest getAliasRequest = new GetAliasesRequest() + .aliases(index.getIndexName()) + .indicesOptions(IndicesOptions.lenientExpandOpenHidden()); + adminClient.indices().getAliases(getAliasRequest, ActionListener.wrap(getAliasResponse -> { + String concreteIndex = null; + for (ObjectObjectCursor> entry : getAliasResponse.getAliases()) { + if (false == entry.value.isEmpty()) { + // we assume the alias map to one concrete index, thus we can return after finding one + concreteIndex = entry.key; + break; + } + } + shouldUpdateConcreteIndex(concreteIndex, newVersion, thenDo); + }, exception -> logger.error(new ParameterizedMessage("Fail to get [{}]'s alias", index.getIndexName()), exception))); + } else { + shouldUpdateConcreteIndex(index.getIndexName(), newVersion, thenDo); + } + } + + @SuppressWarnings("unchecked") + private void shouldUpdateConcreteIndex(String concreteIndex, Integer newVersion, ActionListener thenDo) { + IndexMetadata indexMeataData = clusterService.state().getMetadata().indices().get(concreteIndex); + if (indexMeataData == null) { + thenDo.onResponse(Boolean.FALSE); + return; + } + Integer oldVersion = CommonValue.NO_SCHEMA_VERSION; + + Map indexMapping = indexMeataData.mapping().getSourceAsMap(); + Object meta = indexMapping.get(META); + if (meta != null && meta instanceof Map) { + Map metaMapping = (Map) meta; + Object schemaVersion = metaMapping.get(CommonName.SCHEMA_VERSION_FIELD); + if (schemaVersion instanceof Integer) { + oldVersion = (Integer) schemaVersion; + } + } + thenDo.onResponse(newVersion > oldVersion); + } + + private static Integer parseSchemaVersion(String mapping) { + try { + XContentParser xcp = XContentType.JSON + .xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, mapping); + + while (!xcp.isClosed()) { + Token token = xcp.currentToken(); + if (token != null && token != XContentParser.Token.END_OBJECT && token != XContentParser.Token.START_OBJECT) { + if (xcp.currentName() != META) { + xcp.nextToken(); + xcp.skipChildren(); + } else { + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + if (xcp.currentName().equals(SCHEMA_VERSION)) { + + Integer version = xcp.intValue(); + if (version < 0) { + version = CommonValue.NO_SCHEMA_VERSION; + } + return version; + } else { + xcp.nextToken(); + } + } + + } + } + xcp.nextToken(); + } + return CommonValue.NO_SCHEMA_VERSION; + } catch (Exception e) { + // since this method is called in the constructor that is called by AnomalyDetectorPlugin.createComponents, + // we cannot throw checked exception + throw new RuntimeException(e); + } + } + + /** + * + * @param index Index metadata + * @return The schema version of the given Index + */ + public int getSchemaVersion(ADIndex index) { + IndexState indexState = this.indexStates.computeIfAbsent(index, IndexState::new); + return indexState.schemaVersion; + } + + /** + * + * @param index Index metadata + * @return Whether the given index's mapping is up-to-date + */ + public Boolean isUpdated(ADIndex index) { + IndexState indexState = this.indexStates.computeIfAbsent(index, IndexState::new); + return indexState.updated; + } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/CheckpointDao.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/CheckpointDao.java index d6c388e4..c12d154f 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/CheckpointDao.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/CheckpointDao.java @@ -60,6 +60,7 @@ import org.elasticsearch.index.reindex.ScrollableHitSource; import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; +import com.amazon.opendistroforelasticsearch.ad.indices.ADIndex; import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices; import com.amazon.opendistroforelasticsearch.ad.util.BulkUtil; import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; @@ -342,6 +343,7 @@ public void write(ModelState modelState, String modelId, boolean co source.put(DETECTOR_ID, modelState.getDetectorId()); source.put(FIELD_MODEL, serializedModel); source.put(TIMESTAMP, ZonedDateTime.now(ZoneOffset.UTC)); + source.put(CommonName.SCHEMA_VERSION_FIELD, indexUtil.getSchemaVersion(ADIndex.CHECKPOINT)); requests.add(new IndexRequest(indexName).id(modelId).source(source)); modelState.setLastCheckpointTime(clock.instant()); if (requests.size() >= maxBulkRequestSize) { diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetector.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetector.java index 9619d028..8b95cdc2 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetector.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetector.java @@ -47,6 +47,8 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import com.amazon.opendistroforelasticsearch.ad.annotation.Generated; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonValue; import com.amazon.opendistroforelasticsearch.ad.util.ParseUtils; import com.amazon.opendistroforelasticsearch.commons.authuser.User; import com.google.common.base.Objects; @@ -71,7 +73,6 @@ public class AnomalyDetector implements Writeable, ToXContentObject { private static final String NAME_FIELD = "name"; private static final String DESCRIPTION_FIELD = "description"; private static final String TIMEFIELD_FIELD = "time_field"; - private static final String SCHEMA_VERSION_FIELD = "schema_version"; private static final String INDICES_FIELD = "indices"; private static final String FILTER_QUERY_FIELD = "filter_query"; private static final String FEATURE_ATTRIBUTES_FIELD = "feature_attributes"; @@ -254,7 +255,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws .field(DETECTION_INTERVAL_FIELD, detectionInterval) .field(WINDOW_DELAY_FIELD, windowDelay) .field(SHINGLE_SIZE_FIELD, shingleSize) - .field(SCHEMA_VERSION_FIELD, schemaVersion); + .field(CommonName.SCHEMA_VERSION_FIELD, schemaVersion); if (featureAttributes != null) { xContentBuilder.field(FEATURE_ATTRIBUTES_FIELD, featureAttributes.toArray()); @@ -334,7 +335,7 @@ public static AnomalyDetector parse( : new IntervalTimeConfiguration(defaultDetectionWindowDelay.getSeconds(), ChronoUnit.SECONDS); Integer shingleSize = null; List features = new ArrayList<>(); - int schemaVersion = 0; + Integer schemaVersion = CommonValue.NO_SCHEMA_VERSION; Map uiMetadata = null; Instant lastUpdateTime = null; User user = null; @@ -365,7 +366,7 @@ public static AnomalyDetector parse( case UI_METADATA_FIELD: uiMetadata = parser.map(); break; - case SCHEMA_VERSION_FIELD: + case CommonName.SCHEMA_VERSION_FIELD: schemaVersion = parser.intValue(); break; case FILTER_QUERY_FIELD: diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyResult.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyResult.java index a780169a..197a18ba 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyResult.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyResult.java @@ -33,6 +33,8 @@ import org.elasticsearch.common.xcontent.XContentParser; import com.amazon.opendistroforelasticsearch.ad.annotation.Generated; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonValue; import com.amazon.opendistroforelasticsearch.ad.util.ParseUtils; import com.amazon.opendistroforelasticsearch.commons.authuser.User; import com.google.common.base.Objects; @@ -75,6 +77,7 @@ public class AnomalyResult implements ToXContentObject, Writeable { private final String error; private final List entity; private User user; + private final Integer schemaVersion; public AnomalyResult( String detectorId, @@ -87,7 +90,8 @@ public AnomalyResult( Instant executionStartTime, Instant executionEndTime, String error, - User user + User user, + Integer schemaVersion ) { this( detectorId, @@ -101,7 +105,8 @@ public AnomalyResult( executionEndTime, error, null, - user + user, + schemaVersion ); } @@ -117,7 +122,8 @@ public AnomalyResult( Instant executionEndTime, String error, List entity, - User user + User user, + Integer schemaVersion ) { this.detectorId = detectorId; this.anomalyScore = anomalyScore; @@ -131,6 +137,7 @@ public AnomalyResult( this.error = error; this.entity = entity; this.user = user; + this.schemaVersion = schemaVersion; } public AnomalyResult(StreamInput input) throws IOException { @@ -158,6 +165,7 @@ public AnomalyResult(StreamInput input) throws IOException { } else { user = null; } + this.schemaVersion = input.readInt(); } @Override @@ -166,7 +174,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws .startObject() .field(DETECTOR_ID_FIELD, detectorId) .field(DATA_START_TIME_FIELD, dataStartTime.toEpochMilli()) - .field(DATA_END_TIME_FIELD, dataEndTime.toEpochMilli()); + .field(DATA_END_TIME_FIELD, dataEndTime.toEpochMilli()) + .field(CommonName.SCHEMA_VERSION_FIELD, schemaVersion); if (featureData != null) { // can be null during preview xContentBuilder.field(FEATURE_DATA_FIELD, featureData.toArray()); @@ -213,6 +222,7 @@ public static AnomalyResult parse(XContentParser parser) throws IOException { String error = null; List entityList = null; User user = null; + Integer schemaVersion = CommonValue.NO_SCHEMA_VERSION; ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation); while (parser.nextToken() != XContentParser.Token.END_OBJECT) { @@ -263,6 +273,9 @@ public static AnomalyResult parse(XContentParser parser) throws IOException { case USER_FIELD: user = User.parse(parser); break; + case CommonName.SCHEMA_VERSION_FIELD: + schemaVersion = parser.intValue(); + break; default: parser.skipChildren(); break; @@ -280,7 +293,8 @@ public static AnomalyResult parse(XContentParser parser) throws IOException { executionEndTime, error, entityList, - user + user, + schemaVersion ); } @@ -411,5 +425,6 @@ public void writeTo(StreamOutput out) throws IOException { } else { out.writeBoolean(false); // user does not exist } + out.writeInt(schemaVersion); } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestIndexAnomalyDetectorAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestIndexAnomalyDetectorAction.java index e295adeb..7c0e40a0 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestIndexAnomalyDetectorAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestIndexAnomalyDetectorAction.java @@ -51,7 +51,6 @@ import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin; import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages; -import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting; import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorAction; @@ -75,11 +74,7 @@ public class RestIndexAnomalyDetectorAction extends BaseRestHandler { private volatile Integer maxMultiEntityDetectors; private volatile Integer maxAnomalyFeatures; - public RestIndexAnomalyDetectorAction( - Settings settings, - ClusterService clusterService, - AnomalyDetectionIndices anomalyDetectionIndices - ) { + public RestIndexAnomalyDetectorAction(Settings settings, ClusterService clusterService) { this.requestTimeout = REQUEST_TIMEOUT.get(settings); this.detectionInterval = DETECTION_INTERVAL.get(settings); this.detectionWindowDelay = DETECTION_WINDOW_DELAY.get(settings); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/settings/AnomalyDetectorSettings.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/settings/AnomalyDetectorSettings.java index ef090bbe..a1bdc09c 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/settings/AnomalyDetectorSettings.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/settings/AnomalyDetectorSettings.java @@ -303,5 +303,13 @@ private AnomalyDetectorSettings() {} // for 1m interval. if the max entity number is 3000 per node, it will need around 30m to get all of them cached // Thus, for 5m internval, it will need 2.5 hours to cache all of them. for 1hour interval, it will be 30hours. // but for 1 day interval, it will be 30 days. - public static int MAX_CACHE_HANDLING_PER_SECOND = 100; + public static Setting MAX_CACHE_MISS_HANDLING_PER_SECOND = Setting + .intSetting( + "opendistro.anomaly_detection.max_cache_miss_handling_per_second", + 100, + 0, + 1000, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultTransportAction.java index 38b18a6d..167addd2 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultTransportAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultTransportAction.java @@ -42,6 +42,8 @@ import com.amazon.opendistroforelasticsearch.ad.common.exception.EndRunException; import com.amazon.opendistroforelasticsearch.ad.common.exception.LimitExceededException; import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages; +import com.amazon.opendistroforelasticsearch.ad.indices.ADIndex; +import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices; import com.amazon.opendistroforelasticsearch.ad.ml.CheckpointDao; import com.amazon.opendistroforelasticsearch.ad.ml.EntityModel; import com.amazon.opendistroforelasticsearch.ad.ml.ModelManager; @@ -65,6 +67,7 @@ public class EntityResultTransportAction extends HandledTransportAction> onGetDetector( Instant.now(), null, Arrays.asList(new Entity(categoricalField, entityName)), - detector.getUser() + detector.getUser(), + indexUtil.getSchemaVersion(ADIndex.RESULT) ) ); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/IndexAnomalyDetectorTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/IndexAnomalyDetectorTransportAction.java index 5482a326..a9849012 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/IndexAnomalyDetectorTransportAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/IndexAnomalyDetectorTransportAction.java @@ -66,6 +66,7 @@ public IndexAnomalyDetectorTransportAction( @Override protected void doExecute(Task task, IndexAnomalyDetectorRequest request, ActionListener listener) { + anomalyDetectionIndices.updateMappingIfNecessary(); String detectorId = request.getDetectorID(); long seqNo = request.getSeqNo(); long primaryTerm = request.getPrimaryTerm(); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ThrowingConsumerWrapper.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ThrowingConsumerWrapper.java index 2facdc92..a050b575 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ThrowingConsumerWrapper.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ThrowingConsumerWrapper.java @@ -23,7 +23,7 @@ public class ThrowingConsumerWrapper { * that does not throw the corresponding checked exception. This happens * when we are in a ES function that we have no control over its signature. * Convert the checked exception thrown by by throwingConsumer to a RuntimeException - * so that the compier won't complain. + * so that the compiler won't complain. * @param the method's parameter type * @param throwingConsumer the method reference that can throw checked exception * @return converted method reference diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ThrowingSupplier.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ThrowingSupplier.java new file mode 100644 index 00000000..79fc9d43 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ThrowingSupplier.java @@ -0,0 +1,27 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.util; + +/** + * A supplier that can throw checked exception + * + * @param method parameter type + * @param Exception type + */ +@FunctionalInterface +public interface ThrowingSupplier { + T get() throws E; +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ThrowingSupplierWrapper.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ThrowingSupplierWrapper.java new file mode 100644 index 00000000..ac0a0479 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ThrowingSupplierWrapper.java @@ -0,0 +1,41 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.util; + +import java.util.function.Supplier; + +public class ThrowingSupplierWrapper { + /** + * Utility method to use a method throwing checked exception inside a place + * that does not allow throwing the corresponding checked exception (e.g., + * enum initialization). + * Convert the checked exception thrown by by throwingConsumer to a RuntimeException + * so that the compiler won't complain. + * @param the method's return type + * @param throwingSupplier the method reference that can throw checked exception + * @return converted method reference + */ + public static Supplier throwingSupplierWrapper(ThrowingSupplier throwingSupplier) { + + return () -> { + try { + return throwingSupplier.get(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + }; + } +} diff --git a/src/main/resources/mappings/anomaly-detector-jobs.json b/src/main/resources/mappings/anomaly-detector-jobs.json index 734b487a..1c2fc7fa 100644 --- a/src/main/resources/mappings/anomaly-detector-jobs.json +++ b/src/main/resources/mappings/anomaly-detector-jobs.json @@ -1,7 +1,7 @@ { "dynamic": false, "_meta": { - "schema_version": 1 + "schema_version": 2 }, "properties": { "schema_version": { diff --git a/src/main/resources/mappings/anomaly-detectors.json b/src/main/resources/mappings/anomaly-detectors.json index 8214fc7a..53e8eded 100644 --- a/src/main/resources/mappings/anomaly-detectors.json +++ b/src/main/resources/mappings/anomaly-detectors.json @@ -1,7 +1,7 @@ { "dynamic": false, "_meta": { - "schema_version": 1 + "schema_version": 2 }, "properties": { "schema_version": { diff --git a/src/main/resources/mappings/anomaly-results.json b/src/main/resources/mappings/anomaly-results.json index 13fd2fe7..8fc12236 100644 --- a/src/main/resources/mappings/anomaly-results.json +++ b/src/main/resources/mappings/anomaly-results.json @@ -1,7 +1,7 @@ { "dynamic": false, "_meta": { - "schema_version": 1 + "schema_version": 2 }, "properties": { "detector_id": { @@ -96,6 +96,9 @@ "type": "keyword" } } + }, + "schema_version": { + "type": "integer" } } } diff --git a/src/main/resources/mappings/checkpoint.json b/src/main/resources/mappings/checkpoint.json index 8fe8a099..e058ec3d 100644 --- a/src/main/resources/mappings/checkpoint.json +++ b/src/main/resources/mappings/checkpoint.json @@ -1,18 +1,26 @@ { "dynamic": true, "_meta": { - "schema_version": 1 + "schema_version": 2 }, "properties": { "detectorId": { "type": "keyword" }, "model": { - "type": "text" + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } }, "timestamp": { - "type": "date", - "format": "strict_date_time||epoch_millis" + "type": "date" + }, + "schema_version": { + "type": "integer" } } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorJobRunnerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorJobRunnerTests.java index 42b1ea26..0f17ebd2 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorJobRunnerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorJobRunnerTests.java @@ -112,6 +112,9 @@ public class AnomalyDetectorJobRunnerTests extends AbstractADTest { @Mock private AnomalyIndexHandler anomalyResultHandler; + @Mock + private AnomalyDetectionIndices indexUtil; + private DetectionStateHandler detectorStateHandler; @BeforeClass @@ -166,6 +169,8 @@ public void setup() throws Exception { ); runner.setDetectionStateHandler(detectorStateHandler); + runner.setIndexUtil(indexUtil); + lockService = new LockService(client, clusterService); doReturn(lockService).when(context).getLockService(); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java index f3d69390..e8f1d6b3 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java @@ -102,6 +102,7 @@ import org.elasticsearch.threadpool.ThreadPool; import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonValue; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorExecutionInput; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob; @@ -435,7 +436,8 @@ public static AnomalyResult randomAnomalyDetectResult(double score, String error Instant.now().truncatedTo(ChronoUnit.SECONDS), Instant.now().truncatedTo(ChronoUnit.SECONDS), error, - randomUser() + randomUser(), + CommonValue.NO_SCHEMA_VERSION ); } @@ -456,7 +458,8 @@ public static AnomalyResult randomMutlEntityAnomalyDetectResult(double score, do Instant.now().truncatedTo(ChronoUnit.SECONDS), error, Arrays.asList(new Entity(randomAlphaOfLength(5), randomAlphaOfLength(5))), - randomUser() + randomUser(), + CommonValue.NO_SCHEMA_VERSION ); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/caching/PriorityCacheTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/caching/PriorityCacheTests.java index 057b8c5f..798e781f 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/caching/PriorityCacheTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/caching/PriorityCacheTests.java @@ -32,6 +32,8 @@ import java.time.Duration; import java.time.Instant; import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -43,6 +45,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.IndexNotFoundException; @@ -114,6 +117,12 @@ public void setUp() throws Exception { clusterService = mock(ClusterService.class); settings = Settings.EMPTY; + ClusterSettings clusterSettings = new ClusterSettings( + settings, + Collections.unmodifiableSet(new HashSet<>(Arrays.asList(AnomalyDetectorSettings.MAX_CACHE_MISS_HANDLING_PER_SECOND))) + ); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + threadPool = mock(ThreadPool.class); dedicatedCacheSize = 1; numMinSamples = 3; diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultTransportActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultTransportActionTests.java index a8376f24..cd0f1c73 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultTransportActionTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultTransportActionTests.java @@ -65,6 +65,8 @@ import com.amazon.opendistroforelasticsearch.ad.common.exception.LimitExceededException; import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages; import com.amazon.opendistroforelasticsearch.ad.constant.CommonMessageAttributes; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonValue; +import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices; import com.amazon.opendistroforelasticsearch.ad.ml.CheckpointDao; import com.amazon.opendistroforelasticsearch.ad.ml.EntityModel; import com.amazon.opendistroforelasticsearch.ad.ml.ModelManager; @@ -162,6 +164,9 @@ public void setUp() throws Exception { clock = mock(Clock.class); when(clock.instant()).thenReturn(Instant.now()); + AnomalyDetectionIndices indexUtil = mock(AnomalyDetectionIndices.class); + when(indexUtil.getSchemaVersion(any())).thenReturn(CommonValue.NO_SCHEMA_VERSION); + entityResult = new EntityResultTransportAction( actionFilters, transportService, @@ -172,7 +177,8 @@ public void setUp() throws Exception { provider, stateManager, settings, - clock + clock, + indexUtil ); // timeout in 60 seconds