Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Adding negative cache to throttle extra request https://github.com/opendistro-for-elasticsearch/anomaly-detection/issues/33 #40

Merged

Conversation

zhanghg08
Copy link
Contributor

@zhanghg08 zhanghg08 commented Jan 30, 2020

Description:
Adding negative cache to throttle extra request #33

Updated on Feb 05:

  • Re-structured the code for a better dependency injection
  • Now there is a new class Throttler handle the negative cache stuff.
  • ClientUtil will inject the dependency of Throttler instead of ADStateManager.

Test:

  • ./gradlew build
  • Created a single node domain with NAB data. Created one AD detector and trigger the run. Verified the negative cache work as expected.

@@ -43,6 +43,8 @@ public HourlyCron(ClusterService clusterService, Client client) {
public void run() {
DiscoveryNode[] dataNodes = clusterService.state().nodes().getDataNodes().values().toArray(DiscoveryNode.class);

// we also add the cancel query function here based on query text from the negative cache.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question. From #33 , query cannot be aborted. The comment indicates the opposite. Is it confusing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. That's true. Currently anomaly-detection cannot abort running query in elasticsearch. If a query running longer than expected, anomaly-detection will not wait for that even though the query is still running. To solve this issue, we will 1) stop accepting new query if this case happen which is #33 2) daily cron clean up running query using es task management API(https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html)

* @return unprocessed features and processed features for the current data point
*/
@Deprecated
public SinglePointFeatures getCurrentFeatures(AnomalyDetector detector, long startTime, long endTime) {
public SinglePointFeatures getCurrentFeatures(AnomalyDetector detector, long startTime, long endTime, ADStateManager stateManager) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The added parameter should be a dependency injected rather than passed all the way down the stack.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. We should not pass state manager around.

* @throws IllegalStateException when unexpected failures happen
* @return features from search results, empty when no data found
*/
public Optional<double[]> getFeaturesForPeriod(AnomalyDetector detector, long startTime, long endTime) {
public Optional<double[]> getFeaturesForPeriod(AnomalyDetector detector, long startTime, long endTime, ADStateManager stateManager) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The needed dependency can be injected in this class.

return Optional.empty();
}
if (negativeCache.containsKey(detector.getDetectorId())) {
return Optional.of(negativeCache.get(detector.getDetectorId()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. This method can be simplified to this line.

@@ -119,6 +124,47 @@ public int getPartitionNumber(String adID) throws InterruptedException {
return partitionNum;
}

/**
* Get negative cache value(QueryBuilder, Instant) for given detector
* If detectorId is null, return Optional.empty()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. This can be assumed to be unlikely. If this is a real concern that must be addressed, the proper behavior to expect is to throw an exception.

if (detector.getDetectorId() == null) {
return;
}
negativeCache.putIfAbsent(detector.getDetectorId(), new SimpleEntry<>(searchRequest, clock.instant()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question. Is put more expected for the client? Or, the insert call returns but the entry is still not updated. If that's by design, the documentation should make it clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think put is better than putIfAbsent here. Will update.

if (detector.getDetectorId() == null) {
return;
}
negativeCache.keySet().removeIf(key -> key.equals(detector.getDetectorId()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. Map::remove should work.

Copy link
Contributor

@wnbts wnbts left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion. Throttling can be separated into its own module as a throttler, which can be injected into state manager and client util.

Optional<Entry<SearchRequest, Instant>> queryEntry = stateManager.getFilteredQuery(anomalyDetector);
if (queryEntry.isPresent()) {
LOG.info("There is one query running for detectorId: {}", anomalyDetector.getDetectorId());
listener.onResponse(new AnomalyResultResponse(Double.NaN, Double.NaN, new ArrayList<FeatureData>()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Client can't distinguish this case with "No data in current detection window"(line 295) as both return empty result. How about we throw an exception like line243

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Yes, we should throw exception in this case. Will update in next revision.

@@ -249,6 +251,12 @@ protected void doExecute(Task task, ActionRequest actionRequest, ActionListener<
return;
}
AnomalyDetector anomalyDetector = detector.get();
Optional<Entry<SearchRequest, Instant>> queryEntry = stateManager.getFilteredQuery(anomalyDetector);
if (queryEntry.isPresent()) {
LOG.info("There is one query running for detectorId: {}", anomalyDetector.getDetectorId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a query stuck for a long time, that will impact AD realtime detection. How about we cancel the stuck/running query and run the new coming query?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct. Canceling the stuck query whenever new query comes is one way. However it can be expensive since right now we can only use string compare(query.getDescription()) to find which query to kill. Also killing query using task management API is not effective instantly. It will only happen when moving to next segment.

Copy link
Contributor

@wnbts wnbts left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most stylish issues. Consider race conditions.

/**
* Check if there is running query on given detector
* @param detector Anomaly Detector
* @return boolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. The doc is incomplete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update

@@ -114,7 +114,9 @@ public SearchFeatureDao(
}

/**
* Gets features for the given time period.
* Gets features for the given time period. This function also add given detector to negative cache before sending es request.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. Adds.

Also, use descriptive language instead of prescriptive. See java doc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update, thanks for the suggestion

AnomalyDetector detector
) {
try {
throttler.insertFilteredQuery(detector, request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should happen if the detector is already in cache?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't happen here since if a detector is already in cache, we will throw exception in AnomalyResultTransportAction

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a race condition when concurrent threads have finished the check at the higher level and try to execute queries. The method needs to be synchronized.

* @return boolean
*/
public boolean hasRunningQuery(AnomalyDetector detector) {
Optional<Map.Entry<ActionRequest, Instant>> queryEntry = throttler.getFilteredQuery(detector);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. The method can be simplified to one line. return throttler.getFilteredQuery(detector).isPresent()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update

import java.util.concurrent.ConcurrentHashMap;
import org.elasticsearch.action.ActionRequest;

public class Throttler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. The class java doc is missing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update

Comment on lines 44 to 45
if (negativeCache.containsKey(detector.getDetectorId())) {
return Optional.of(negativeCache.get(detector.getDetectorId()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MInor. The whole method can be simplified to this line return Optional.of(negativeCache.get(detector.getDetectorId()));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be Optional.ofNullable


/**
* Insert the negative cache entry for given detector
* If detectorId is null, do nothing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. The java doc is outdated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update

@zhanghg08 zhanghg08 requested a review from wnbts February 18, 2020 22:21
return Optional.of(negativeCache.get(detector.getDetectorId()));
}
return Optional.empty();
return Optional.of(negativeCache.get(detector.getDetectorId()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional.ofNullable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update

Copy link
Contributor

@wnbts wnbts left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this pr has some conflicts with current branch to resolve

@@ -115,6 +115,9 @@ public SearchFeatureDao(

/**
* Gets features for the given time period.
* This function also adds given detector to negative cache before sending es request.
* Once we get response/exception within timeout, we treat this request as complete and clear the negative cache.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. We is prescriptive (giving orders to code), not descriptive (stating what code does).

AnomalyDetector detector
) {
try {
throttler.insertFilteredQuery(detector, request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a race condition when concurrent threads have finished the check at the higher level and try to execute queries. The method needs to be synchronized.


/**
* Clear the negative cache for given detector.
* If detectorId is null, do nothing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. Outdated doc.

* @param detector AnomalyDetector
*/
public void clearFilteredQuery(AnomalyDetector detector) {
negativeCache.keySet().removeIf(key -> key.equals(detector.getDetectorId()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. Why not using Map::remove?

// if key already exist, reject the request and throws exception
if (!throttler.insertFilteredQuery(detector.getDetectorId(), request)) {
LOG.error("There is one query running for detectorId: {}", detector.getDetectorId());
throw new EndRunException(detector.getDetectorId(), "There is one query running on AnomalyDetector", true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MInor. This exception should be documented for client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added.

return Optional.ofNullable(respReference.get());
} catch (InterruptedException e1) {
LOG.error(CommonErrorMessages.WAIT_ERR_MSG);
throw new IllegalStateException(e1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If exception happens, should clear negative cache for this detector

Copy link
Contributor

@ylwu-amzn ylwu-amzn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the change.

@zhanghg08 zhanghg08 merged commit b8ea1a2 into opendistro-for-elasticsearch:development Feb 24, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants