Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Adding negative cache to throttle extra request https://github.com/opendistro-for-elasticsearch/anomaly-detection/issues/33 #40

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils;
import com.amazon.opendistroforelasticsearch.ad.util.Throttler;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
Expand Down Expand Up @@ -196,7 +197,9 @@ public Collection<Object> createComponents(
NamedWriteableRegistry namedWriteableRegistry
) {
Settings settings = environment.settings();
ClientUtil clientUtil = new ClientUtil(settings, client);
Clock clock = Clock.systemUTC();
Throttler throttler = new Throttler(clock);
ClientUtil clientUtil = new ClientUtil(settings, client, throttler);
IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService);
anomalyDetectionIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings, clientUtil);
this.clusterService = clusterService;
Expand All @@ -209,7 +212,6 @@ public Collection<Object> createComponents(
JvmService jvmService = new JvmService(environment.settings());
RandomCutForestSerDe rcfSerde = new RandomCutForestSerDe();
CheckpointDao checkpoint = new CheckpointDao(client, clientUtil, CommonName.CHECKPOINT_INDEX_NAME);
Clock clock = Clock.systemUTC();

ModelManager modelManager = new ModelManager(
clusterService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public HourlyCron(ClusterService clusterService, Client client) {
public void run() {
DiscoveryNode[] dataNodes = clusterService.state().nodes().getDataNodes().values().toArray(DiscoveryNode.class);

// we also add the cancel query function here based on query text from the negative cache.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question. From #33 , query cannot be aborted. The comment indicates the opposite. Is it confusing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. That's true. Currently anomaly-detection cannot abort running query in elasticsearch. If a query running longer than expected, anomaly-detection will not wait for that even though the query is still running. To solve this issue, we will 1) stop accepting new query if this case happen which is #33 2) daily cron clean up running query using es task management API(https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html)


CronRequest modelDeleteRequest = new CronRequest(dataNodes);
client.execute(CronAction.INSTANCE, modelDeleteRequest, ActionListener.wrap(response -> {
if (response.hasFailures()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ public Optional<Long> getLatestDataTime(AnomalyDetector detector) {

/**
* Gets features for the given time period.
* This function also adds given detector to negative cache before sending es request.
* Once response/exception is received within timeout, this request will be treated as complete
* and cleared from the negative cache.
* Otherwise this detector entry remain in the negative to reject further request.
*
* @param detector info about indices, documents, feature query
* @param startTime epoch milliseconds at the beginning of the period
Expand All @@ -124,8 +128,10 @@ public Optional<Long> getLatestDataTime(AnomalyDetector detector) {
*/
public Optional<double[]> getFeaturesForPeriod(AnomalyDetector detector, long startTime, long endTime) {
SearchRequest searchRequest = createFeatureSearchRequest(detector, startTime, endTime, Optional.empty());

// send throttled request: this request will clear the negative cache if the request finished within timeout
return clientUtil
.<SearchRequest, SearchResponse>timedRequest(searchRequest, logger, client::search)
.<SearchRequest, SearchResponse>throttledTimedRequest(searchRequest, logger, client::search, detector)
.flatMap(resp -> parseResponse(resp, detector.getEnabledFeatureIds()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,13 @@ public void addPressure(String nodeId) {
public void resetBackpressureCounter(String nodeId) {
backpressureMuter.remove(nodeId);
}

/**
* Check if there is running query on given detector
* @param detector Anomaly Detector
* @return true if given detector has a running query else false
*/
public boolean hasRunningQuery(AnomalyDetector detector) {
return clientUtil.hasRunningQuery(detector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ protected void doExecute(Task task, ActionRequest actionRequest, ActionListener<
return;
}
AnomalyDetector anomalyDetector = detector.get();
if (stateManager.hasRunningQuery(anomalyDetector)) {
LOG.error("There is one query running for detectorId: {}", anomalyDetector.getDetectorId());
listener.onFailure(new EndRunException(adID, "There is one query running on AnomalyDetector", true));
return;
}

String thresholdModelID = modelManager.getThresholdModelId(adID);
Optional<DiscoveryNode> thresholdNode = hashRing.getOwningNode(thresholdModelID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import java.util.function.BiConsumer;
import java.util.function.Function;

import com.amazon.opendistroforelasticsearch.ad.common.exception.EndRunException;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionType;
Expand All @@ -42,11 +45,13 @@
public class ClientUtil {
private volatile TimeValue requestTimeout;
private Client client;
private final Throttler throttler;

@Inject
public ClientUtil(Settings setting, Client client) {
public ClientUtil(Settings setting, Client client, Throttler throttler) {
this.requestTimeout = REQUEST_TIMEOUT.get(setting);
this.client = client;
this.throttler = throttler;
}

/**
Expand Down Expand Up @@ -152,4 +157,69 @@ public <Request extends ActionRequest, Response extends ActionResponse> Response
) {
return function.apply(request).actionGet(requestTimeout);
}

/**
* Send a nonblocking request with a timeout and return response. The request will first be put into
* the negative cache. Once the request complete, it will be removed from the negative cache.
*
* @param request request like index/search/get
* @param LOG log
* @param consumer functional interface to operate as a client request like client::get
* @param <Request> ActionRequest
* @param <Response> ActionResponse
* @param detector Anomaly Detector
* @return the response
* @throws EndRunException when there is already a query running
* @throws ElasticsearchTimeoutException when we cannot get response within time.
* @throws IllegalStateException when the waiting thread is interrupted
*/
public <Request extends ActionRequest, Response extends ActionResponse> Optional<Response> throttledTimedRequest(
Request request,
Logger LOG,
BiConsumer<Request, ActionListener<Response>> consumer,
AnomalyDetector detector
) {
try {
// if key already exist, reject the request and throws exception
if (!throttler.insertFilteredQuery(detector.getDetectorId(), request)) {
LOG.error("There is one query running for detectorId: {}", detector.getDetectorId());
throw new EndRunException(detector.getDetectorId(), "There is one query running on AnomalyDetector", true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MInor. This exception should be documented for client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added.

}
AtomicReference<Response> respReference = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);

try {
consumer.accept(request, new LatchedActionListener<Response>(ActionListener.wrap(response -> {
// clear negative cache
throttler.clearFilteredQuery(detector.getDetectorId());
respReference.set(response);
}, exception -> {
// clear negative cache
throttler.clearFilteredQuery(detector.getDetectorId());
LOG.error("Cannot get response for request {}, error: {}", request, exception);
}), latch));
} catch (Exception e) {
LOG.error("Failed to process the request for detectorId: {}.", detector.getDetectorId());
throttler.clearFilteredQuery(detector.getDetectorId());
throw e;
}

if (!latch.await(requestTimeout.getSeconds(), TimeUnit.SECONDS)) {
throw new ElasticsearchTimeoutException("Cannot get response within time limit: " + request.toString());
}
return Optional.ofNullable(respReference.get());
} catch (InterruptedException e1) {
LOG.error(CommonErrorMessages.WAIT_ERR_MSG);
throw new IllegalStateException(e1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If exception happens, should clear negative cache for this detector

}
}

/**
* Check if there is running query on given detector
* @param detector Anomaly Detector
* @return true if given detector has a running query else false
*/
public boolean hasRunningQuery(AnomalyDetector detector) {
return throttler.getFilteredQuery(detector.getDetectorId()).isPresent();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazon.opendistroforelasticsearch.ad.util;

import java.time.Clock;
import java.time.Instant;
import java.util.AbstractMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.elasticsearch.action.ActionRequest;

/**
* Utility functions for throttling query.
*/
public class Throttler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. The class java doc is missing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update

// negativeCache is used to reject search query if given detector already has one query running
// key is detectorId, value is an entry. Key is ActionRequest and value is the timestamp
private final ConcurrentHashMap<String, Map.Entry<ActionRequest, Instant>> negativeCache;
private final Clock clock;

public Throttler(Clock clock) {
this.negativeCache = new ConcurrentHashMap<>();
this.clock = clock;
}

/**
* Get negative cache value(ActionRequest, Instant) for given detector
* @param detectorId AnomalyDetector ID
* @return negative cache value(ActionRequest, Instant)
*/
public Optional<Map.Entry<ActionRequest, Instant>> getFilteredQuery(String detectorId) {
return Optional.ofNullable(negativeCache.get(detectorId));
}

/**
* Insert the negative cache entry for given detector
* If key already exists, return false. Otherwise true.
* @param detectorId AnomalyDetector ID
* @param request ActionRequest
* @return true if key doesn't exist otherwise false.
*/
public synchronized boolean insertFilteredQuery(String detectorId, ActionRequest request) {
return negativeCache.putIfAbsent(detectorId, new AbstractMap.SimpleEntry<>(request, clock.instant())) == null;
}

/**
* Clear the negative cache for given detector.
* @param detectorId AnomalyDetector ID
*/
public void clearFilteredQuery(String detectorId) {
negativeCache.remove(detectorId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.amazon.opendistroforelasticsearch.ad.dataprocessor.SingleFeatureLinearUniformInterpolator;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStateManager;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;

Expand Down Expand Up @@ -88,6 +89,9 @@ public class FeatureManagerTests {
@Mock
private Clock clock;

@Mock
private ADStateManager stateManager;

private FeatureManager featureManager;

@Before
Expand Down Expand Up @@ -308,6 +312,7 @@ public void clear_deleteFeatures() {
for (int i = 1; i <= shingleSize; i++) {
start = i * 10;
end = (i + 1) * 10;

when(searchFeatureDao.getFeaturesForPeriod(detector, start, end)).thenReturn(Optional.of(new double[] { i }));
featureManager.getCurrentFeatures(detector, start, end);
}
Expand All @@ -329,6 +334,7 @@ public void maintenance_removeStaleData() {
for (int i = 1; i <= shingleSize; i++) {
start = i * 10;
end = (i + 1) * 10;

when(searchFeatureDao.getFeaturesForPeriod(detector, start, end)).thenReturn(Optional.of(new double[] { i }));
featureManager.getCurrentFeatures(detector, start, end);
}
Expand All @@ -351,6 +357,7 @@ public void maintenance_keepRecentData() {
for (int i = 1; i <= shingleSize; i++) {
start = i * 10;
end = (i + 1) * 10;

when(searchFeatureDao.getFeaturesForPeriod(detector, start, end)).thenReturn(Optional.of(new double[] { i }));
featureManager.getCurrentFeatures(detector, start, end);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.amazon.opendistroforelasticsearch.ad.dataprocessor.SingleFeatureLinearUniformInterpolator;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStateManager;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.ad.util.ParseUtils;
import junitparams.JUnitParamsRunner;
Expand Down Expand Up @@ -127,6 +128,8 @@ public class SearchFeatureDaoTests {
private Aggregations aggs;
@Mock
private Max max;
@Mock
private ADStateManager stateManager;

@Mock
private AnomalyDetector detector;
Expand Down Expand Up @@ -171,6 +174,15 @@ public void setup() throws Exception {
.timedRequest(eq(searchRequest), anyObject(), Matchers.<BiConsumer<SearchRequest, ActionListener<SearchResponse>>>anyObject());
when(searchResponse.getAggregations()).thenReturn(aggregations);

doReturn(Optional.of(searchResponse))
.when(clientUtil)
.throttledTimedRequest(
eq(searchRequest),
anyObject(),
Matchers.<BiConsumer<SearchRequest, ActionListener<SearchResponse>>>anyObject(),
anyObject()
);

multiSearchRequest = new MultiSearchRequest();
SearchRequest request = new SearchRequest(detector.getIndices().toArray(new String[0]))
.preference(SearchFeatureDao.FEATURE_SAMPLE_PREFERENCE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult;
import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils;
import com.amazon.opendistroforelasticsearch.ad.util.Throttler;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
Expand All @@ -39,6 +40,7 @@
import org.junit.Before;

import java.io.IOException;
import java.time.Clock;
import java.util.HashSet;
import java.util.Set;

Expand Down Expand Up @@ -70,7 +72,9 @@ public void setup() {
clusterSetting = new ClusterSettings(settings, clusterSettings);
clusterService = TestHelpers.createClusterService(client().threadPool(), clusterSetting);
client = mock(Client.class);
requestUtil = new ClientUtil(settings, client);
Clock clock = Clock.systemUTC();
Throttler throttler = new Throttler(clock);
requestUtil = new ClientUtil(settings, client, throttler);
indices = new AnomalyDetectionIndices(client(), clusterService, client().threadPool(), settings, requestUtil);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@
import com.amazon.opendistroforelasticsearch.ad.ml.ModelManager;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.ad.util.Throttler;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.MapperService;
Expand All @@ -64,6 +67,7 @@ public class ADStateManagerTests extends ESTestCase {
private Client client;
private Clock clock;
private Duration duration;
private Throttler throttler;

@Override
protected NamedXContentRegistry xContentRegistry() {
Expand All @@ -86,12 +90,13 @@ public void setUp() throws Exception {
.build();
clock = mock(Clock.class);
duration = Duration.ofHours(1);
throttler = new Throttler(clock);
stateManager = new ADStateManager(
client,
xContentRegistry(),
modelManager,
settings,
new ClientUtil(settings, client),
new ClientUtil(settings, client, throttler),
clock,
duration
);
Expand Down Expand Up @@ -203,4 +208,12 @@ public void testMaintenancRemove() throws IOException {
assertEquals(0, states.size());

}

public void testHasRunningQuery() throws IOException {
AnomalyDetector detector = TestHelpers.randomAnomalyDetector(ImmutableMap.of(), null);
SearchRequest dummySearchRequest = new SearchRequest();
assertFalse(stateManager.hasRunningQuery(detector));
throttler.insertFilteredQuery(detector.getDetectorId(), dummySearchRequest);
assertTrue(stateManager.hasRunningQuery(detector));
}
}
Loading