-
Notifications
You must be signed in to change notification settings - Fork 36
Adding negative cache to throttle extra request https://github.com/opendistro-for-elasticsearch/anomaly-detection/issues/33 #40
Changes from all commits
9aa9e46
bc6a763
3a82b22
fe2a193
1bb9257
fae4c59
c6a048d
fcab62c
f7ad7c0
ff73fb7
95f3892
1d086d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,9 @@ | |
import java.util.function.BiConsumer; | ||
import java.util.function.Function; | ||
|
||
import com.amazon.opendistroforelasticsearch.ad.common.exception.EndRunException; | ||
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; | ||
|
||
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.ElasticsearchTimeoutException; | ||
import org.elasticsearch.action.ActionType; | ||
|
@@ -42,11 +45,13 @@ | |
public class ClientUtil { | ||
private volatile TimeValue requestTimeout; | ||
private Client client; | ||
private final Throttler throttler; | ||
|
||
@Inject | ||
public ClientUtil(Settings setting, Client client) { | ||
public ClientUtil(Settings setting, Client client, Throttler throttler) { | ||
this.requestTimeout = REQUEST_TIMEOUT.get(setting); | ||
this.client = client; | ||
this.throttler = throttler; | ||
} | ||
|
||
/** | ||
|
@@ -152,4 +157,69 @@ public <Request extends ActionRequest, Response extends ActionResponse> Response | |
) { | ||
return function.apply(request).actionGet(requestTimeout); | ||
} | ||
|
||
/** | ||
* Send a nonblocking request with a timeout and return response. The request will first be put into | ||
* the negative cache. Once the request complete, it will be removed from the negative cache. | ||
* | ||
* @param request request like index/search/get | ||
* @param LOG log | ||
* @param consumer functional interface to operate as a client request like client::get | ||
* @param <Request> ActionRequest | ||
* @param <Response> ActionResponse | ||
* @param detector Anomaly Detector | ||
* @return the response | ||
* @throws EndRunException when there is already a query running | ||
* @throws ElasticsearchTimeoutException when we cannot get response within time. | ||
* @throws IllegalStateException when the waiting thread is interrupted | ||
*/ | ||
public <Request extends ActionRequest, Response extends ActionResponse> Optional<Response> throttledTimedRequest( | ||
Request request, | ||
Logger LOG, | ||
BiConsumer<Request, ActionListener<Response>> consumer, | ||
AnomalyDetector detector | ||
) { | ||
try { | ||
// if key already exist, reject the request and throws exception | ||
if (!throttler.insertFilteredQuery(detector.getDetectorId(), request)) { | ||
LOG.error("There is one query running for detectorId: {}", detector.getDetectorId()); | ||
throw new EndRunException(detector.getDetectorId(), "There is one query running on AnomalyDetector", true); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. MInor. This exception should be documented for client. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added. |
||
} | ||
AtomicReference<Response> respReference = new AtomicReference<>(); | ||
final CountDownLatch latch = new CountDownLatch(1); | ||
|
||
try { | ||
consumer.accept(request, new LatchedActionListener<Response>(ActionListener.wrap(response -> { | ||
// clear negative cache | ||
throttler.clearFilteredQuery(detector.getDetectorId()); | ||
respReference.set(response); | ||
}, exception -> { | ||
// clear negative cache | ||
throttler.clearFilteredQuery(detector.getDetectorId()); | ||
LOG.error("Cannot get response for request {}, error: {}", request, exception); | ||
}), latch)); | ||
} catch (Exception e) { | ||
LOG.error("Failed to process the request for detectorId: {}.", detector.getDetectorId()); | ||
throttler.clearFilteredQuery(detector.getDetectorId()); | ||
throw e; | ||
} | ||
|
||
if (!latch.await(requestTimeout.getSeconds(), TimeUnit.SECONDS)) { | ||
throw new ElasticsearchTimeoutException("Cannot get response within time limit: " + request.toString()); | ||
} | ||
return Optional.ofNullable(respReference.get()); | ||
} catch (InterruptedException e1) { | ||
LOG.error(CommonErrorMessages.WAIT_ERR_MSG); | ||
throw new IllegalStateException(e1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If exception happens, should clear negative cache for this detector |
||
} | ||
} | ||
|
||
/** | ||
* Check if there is running query on given detector | ||
* @param detector Anomaly Detector | ||
* @return true if given detector has a running query else false | ||
*/ | ||
public boolean hasRunningQuery(AnomalyDetector detector) { | ||
return throttler.getFilteredQuery(detector.getDetectorId()).isPresent(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
/* | ||
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"). | ||
* You may not use this file except in compliance with the License. | ||
* A copy of the License is located at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed | ||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing | ||
* permissions and limitations under the License. | ||
*/ | ||
package com.amazon.opendistroforelasticsearch.ad.util; | ||
|
||
import java.time.Clock; | ||
import java.time.Instant; | ||
import java.util.AbstractMap; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import org.elasticsearch.action.ActionRequest; | ||
|
||
/** | ||
* Utility functions for throttling query. | ||
*/ | ||
public class Throttler { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor. The class java doc is missing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will update |
||
// negativeCache is used to reject search query if given detector already has one query running | ||
// key is detectorId, value is an entry. Key is ActionRequest and value is the timestamp | ||
private final ConcurrentHashMap<String, Map.Entry<ActionRequest, Instant>> negativeCache; | ||
private final Clock clock; | ||
|
||
public Throttler(Clock clock) { | ||
this.negativeCache = new ConcurrentHashMap<>(); | ||
this.clock = clock; | ||
} | ||
|
||
/** | ||
* Get negative cache value(ActionRequest, Instant) for given detector | ||
* @param detectorId AnomalyDetector ID | ||
* @return negative cache value(ActionRequest, Instant) | ||
*/ | ||
public Optional<Map.Entry<ActionRequest, Instant>> getFilteredQuery(String detectorId) { | ||
return Optional.ofNullable(negativeCache.get(detectorId)); | ||
} | ||
|
||
/** | ||
* Insert the negative cache entry for given detector | ||
* If key already exists, return false. Otherwise true. | ||
* @param detectorId AnomalyDetector ID | ||
* @param request ActionRequest | ||
* @return true if key doesn't exist otherwise false. | ||
*/ | ||
public synchronized boolean insertFilteredQuery(String detectorId, ActionRequest request) { | ||
return negativeCache.putIfAbsent(detectorId, new AbstractMap.SimpleEntry<>(request, clock.instant())) == null; | ||
} | ||
|
||
/** | ||
* Clear the negative cache for given detector. | ||
* @param detectorId AnomalyDetector ID | ||
*/ | ||
public void clearFilteredQuery(String detectorId) { | ||
negativeCache.remove(detectorId); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question. From #33 , query cannot be aborted. The comment indicates the opposite. Is it confusing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. That's true. Currently anomaly-detection cannot abort running query in elasticsearch. If a query running longer than expected, anomaly-detection will not wait for that even though the query is still running. To solve this issue, we will 1) stop accepting new query if this case happen which is #33 2) daily cron clean up running query using es task management API(https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html)