-
Notifications
You must be signed in to change notification settings - Fork 36
Adding negative cache to throttle extra request https://github.com/opendistro-for-elasticsearch/anomaly-detection/issues/33 #40
Changes from 4 commits
9aa9e46
bc6a763
3a82b22
fe2a193
1bb9257
fae4c59
c6a048d
fcab62c
f7ad7c0
ff73fb7
95f3892
1d086d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -114,7 +114,9 @@ public Optional<Long> getLatestDataTime(AnomalyDetector detector) { | |
} | ||
|
||
/** | ||
* Gets features for the given time period. | ||
* Gets features for the given time period. This function also add given detector to negative cache before sending es request. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor. Adds. Also, use descriptive language instead of prescriptive. See java doc. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will update, thanks for the suggestion |
||
* Once we get response/exception within timeout, we treat this request as complete and clear the negative cache. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor. We is prescriptive (giving orders to code), not descriptive (stating what code does). |
||
* Otherwise this detector entry remain in the negative to reject further request. | ||
* | ||
* @param detector info about indices, documents, feature query | ||
* @param startTime epoch milliseconds at the beginning of the period | ||
|
@@ -124,8 +126,10 @@ public Optional<Long> getLatestDataTime(AnomalyDetector detector) { | |
*/ | ||
public Optional<double[]> getFeaturesForPeriod(AnomalyDetector detector, long startTime, long endTime) { | ||
SearchRequest searchRequest = createFeatureSearchRequest(detector, startTime, endTime, Optional.empty()); | ||
|
||
// send throttled request: this request will clear the negative cache if the request finished within timeout | ||
return clientUtil | ||
.<SearchRequest, SearchResponse>timedRequest(searchRequest, logger, client::search) | ||
.<SearchRequest, SearchResponse>throttledTimedRequest(searchRequest, logger, client::search, detector) | ||
.flatMap(resp -> parseResponse(resp, detector.getEnabledFeatureIds())); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -209,4 +209,13 @@ public void addPressure(String nodeId) { | |
public void resetBackpressureCounter(String nodeId) { | ||
backpressureMuter.remove(nodeId); | ||
} | ||
|
||
/** | ||
* Check if there is running query on given detector | ||
* @param detector Anomaly Detector | ||
* @return boolean | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor. The doc is incomplete. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will update |
||
*/ | ||
public boolean hasRunningQuery(AnomalyDetector detector) { | ||
return clientUtil.hasRunningQuery(detector); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -249,6 +249,11 @@ protected void doExecute(Task task, ActionRequest actionRequest, ActionListener< | |
return; | ||
} | ||
AnomalyDetector anomalyDetector = detector.get(); | ||
if (stateManager.hasRunningQuery(anomalyDetector)) { | ||
LOG.info("There is one query running for detectorId: {}", anomalyDetector.getDetectorId()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If a query stuck for a long time, that will impact AD realtime detection. How about we cancel the stuck/running query and run the new coming query? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are correct. Canceling the stuck query whenever new query comes is one way. However it can be expensive since right now we can only use string compare(query.getDescription()) to find which query to kill. Also killing query using task management API is not effective instantly. It will only happen when moving to next segment. |
||
listener.onFailure(new EndRunException(adID, "There is one query running on AnomalyDetector", true)); | ||
return; | ||
} | ||
|
||
String thresholdModelID = modelManager.getThresholdModelId(adID); | ||
Optional<DiscoveryNode> thresholdNode = hashRing.getOwningNode(thresholdModelID); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,13 +17,17 @@ | |
|
||
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.REQUEST_TIMEOUT; | ||
|
||
import java.time.Instant; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.BiConsumer; | ||
import java.util.function.Function; | ||
|
||
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; | ||
|
||
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.ElasticsearchTimeoutException; | ||
import org.elasticsearch.action.Action; | ||
|
@@ -42,11 +46,13 @@ | |
public class ClientUtil { | ||
private volatile TimeValue requestTimeout; | ||
private Client client; | ||
private final Throttler throttler; | ||
|
||
@Inject | ||
public ClientUtil(Settings setting, Client client) { | ||
public ClientUtil(Settings setting, Client client, Throttler throttler) { | ||
this.requestTimeout = REQUEST_TIMEOUT.get(setting); | ||
this.client = client; | ||
this.throttler = throttler; | ||
} | ||
|
||
/** | ||
|
@@ -152,4 +158,62 @@ public <Request extends ActionRequest, Response extends ActionResponse> Response | |
) { | ||
return function.apply(request).actionGet(requestTimeout); | ||
} | ||
|
||
/** | ||
* Send a nonblocking request with a timeout and return response. The request will first be put into | ||
* the negative cache. Once the request complete, it will be removed from the negative cache. | ||
* | ||
* @param request request like index/search/get | ||
* @param LOG log | ||
* @param consumer functional interface to operate as a client request like client::get | ||
* @param <Request> ActionRequest | ||
* @param <Response> ActionResponse | ||
* @param detector Anomaly Detector | ||
* @return the response | ||
* @throws ElasticsearchTimeoutException when we cannot get response within time. | ||
* @throws IllegalStateException when the waiting thread is interrupted | ||
*/ | ||
public <Request extends ActionRequest, Response extends ActionResponse> Optional<Response> throttledTimedRequest( | ||
Request request, | ||
Logger LOG, | ||
BiConsumer<Request, ActionListener<Response>> consumer, | ||
AnomalyDetector detector | ||
) { | ||
try { | ||
throttler.insertFilteredQuery(detector, request); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What should happen if the detector is already in cache? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It won't happen here since if a detector is already in cache, we will throw exception in AnomalyResultTransportAction There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be a race condition when concurrent threads have finished the check at the higher level and try to execute queries. The method needs to be synchronized. |
||
AtomicReference<Response> respReference = new AtomicReference<>(); | ||
final CountDownLatch latch = new CountDownLatch(1); | ||
|
||
consumer.accept(request, new LatchedActionListener<Response>(ActionListener.wrap(response -> { | ||
// clear negative cache | ||
throttler.clearFilteredQuery(detector); | ||
respReference.set(response); | ||
}, exception -> { | ||
// clear negative cache | ||
throttler.clearFilteredQuery(detector); | ||
LOG.error("Cannot get response for request {}, error: {}", request, exception); | ||
}), latch)); | ||
|
||
if (!latch.await(requestTimeout.getSeconds(), TimeUnit.SECONDS)) { | ||
throw new ElasticsearchTimeoutException("Cannot get response within time limit: " + request.toString()); | ||
} | ||
return Optional.ofNullable(respReference.get()); | ||
} catch (InterruptedException e1) { | ||
LOG.error(CommonErrorMessages.WAIT_ERR_MSG); | ||
throw new IllegalStateException(e1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If exception happens, should clear negative cache for this detector |
||
} | ||
} | ||
|
||
/** | ||
* Check if there is running query on given detector | ||
* @param detector Anomaly Detector | ||
* @return boolean | ||
*/ | ||
public boolean hasRunningQuery(AnomalyDetector detector) { | ||
Optional<Map.Entry<ActionRequest, Instant>> queryEntry = throttler.getFilteredQuery(detector); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor. The method can be simplified to one line. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will update |
||
if (queryEntry.isPresent()) { | ||
return true; | ||
} | ||
return false; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question. From #33 , query cannot be aborted. The comment indicates the opposite. Is it confusing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. That's true. Currently anomaly-detection cannot abort running query in elasticsearch. If a query running longer than expected, anomaly-detection will not wait for that even though the query is still running. To solve this issue, we will 1) stop accepting new query if this case happen which is #33 2) daily cron clean up running query using es task management API(https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html)