Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Adding negative cache to throttle extra request https://github.com/opendistro-for-elasticsearch/anomaly-detection/issues/33 #40

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils;
import com.amazon.opendistroforelasticsearch.ad.util.Throttler;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
Expand Down Expand Up @@ -200,7 +201,9 @@ public Collection<Object> createComponents(
NamedWriteableRegistry namedWriteableRegistry
) {
Settings settings = environment.settings();
ClientUtil clientUtil = new ClientUtil(settings, client);
Clock clock = Clock.systemUTC();
Throttler throttler = new Throttler(clock);
ClientUtil clientUtil = new ClientUtil(settings, client, throttler);
IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService);
anomalyDetectionIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings, clientUtil);
this.clusterService = clusterService;
Expand All @@ -213,7 +216,6 @@ public Collection<Object> createComponents(
JvmService jvmService = new JvmService(environment.settings());
RandomCutForestSerDe rcfSerde = new RandomCutForestSerDe();
CheckpointDao checkpoint = new CheckpointDao(client, clientUtil, CommonName.CHECKPOINT_INDEX_NAME);
Clock clock = Clock.systemUTC();

ModelManager modelManager = new ModelManager(
clusterService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public HourlyCron(ClusterService clusterService, Client client) {
public void run() {
DiscoveryNode[] dataNodes = clusterService.state().nodes().getDataNodes().values().toArray(DiscoveryNode.class);

// we also add the cancel query function here based on query text from the negative cache.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question. From #33 , query cannot be aborted. The comment indicates the opposite. Is it confusing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. That's true. Currently anomaly-detection cannot abort running query in elasticsearch. If a query running longer than expected, anomaly-detection will not wait for that even though the query is still running. To solve this issue, we will 1) stop accepting new query if this case happen which is #33 2) daily cron clean up running query using es task management API(https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html)


CronRequest modelDeleteRequest = new CronRequest(dataNodes);
client.execute(CronAction.INSTANCE, modelDeleteRequest, ActionListener.wrap(response -> {
if (response.hasFailures()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ public Optional<Long> getLatestDataTime(AnomalyDetector detector) {
}

/**
* Gets features for the given time period.
* Gets features for the given time period. This function also add given detector to negative cache before sending es request.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. Adds.

Also, use descriptive language instead of prescriptive. See java doc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update, thanks for the suggestion

* Once we get response/exception within timeout, we treat this request as complete and clear the negative cache.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. We is prescriptive (giving orders to code), not descriptive (stating what code does).

* Otherwise this detector entry remain in the negative to reject further request.
*
* @param detector info about indices, documents, feature query
* @param startTime epoch milliseconds at the beginning of the period
Expand All @@ -124,8 +126,10 @@ public Optional<Long> getLatestDataTime(AnomalyDetector detector) {
*/
public Optional<double[]> getFeaturesForPeriod(AnomalyDetector detector, long startTime, long endTime) {
SearchRequest searchRequest = createFeatureSearchRequest(detector, startTime, endTime, Optional.empty());

// send throttled request: this request will clear the negative cache if the request finished within timeout
return clientUtil
.<SearchRequest, SearchResponse>timedRequest(searchRequest, logger, client::search)
.<SearchRequest, SearchResponse>throttledTimedRequest(searchRequest, logger, client::search, detector)
.flatMap(resp -> parseResponse(resp, detector.getEnabledFeatureIds()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.rest.handler.AnomalyDetectorActionHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorRequest;
import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorResponse;
import com.amazon.opendistroforelasticsearch.ad.transport.DeleteDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.DeleteDetectorRequest;
import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -89,8 +89,8 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
return channel -> {
if (channel.request().method() == RestRequest.Method.POST) {
logger.info("Stop anomaly detector {}", detectorId);
StopDetectorRequest stopDetectorRequest = new StopDetectorRequest(detectorId);
client.execute(StopDetectorAction.INSTANCE, stopDetectorRequest, stopAdDetectorListener(channel, detectorId));
DeleteDetectorRequest deleteDetectorRequest = new DeleteDetectorRequest().adID(detectorId);
client.execute(DeleteDetectorAction.INSTANCE, deleteDetectorRequest, stopAdDetectorListener(channel, detectorId));
} else if (channel.request().method() == RestRequest.Method.DELETE) {
logger.info("Delete anomaly detector {}", detectorId);
handler
Expand All @@ -117,11 +117,11 @@ private void deleteAnomalyDetectorDoc(
client.delete(deleteRequest, new RestStatusToXContentListener<>(channel));
}

private ActionListener<StopDetectorResponse> stopAdDetectorListener(RestChannel channel, String detectorId) {
return new ActionListener<StopDetectorResponse>() {
private ActionListener<AcknowledgedResponse> stopAdDetectorListener(RestChannel channel, String detectorId) {
return new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(StopDetectorResponse stopDetectorResponse) {
if (stopDetectorResponse.success()) {
public void onResponse(AcknowledgedResponse deleteDetectorResponse) {
if (deleteDetectorResponse.isAcknowledged()) {
logger.info("AD model deleted successfully for detector {}", detectorId);
channel.sendResponse(new BytesRestResponse(RestStatus.OK, "AD model deleted successfully"));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,93 +29,91 @@ public final class AnomalyDetectorSettings {
private AnomalyDetectorSettings() {}

public static final Setting<Integer> MAX_ANOMALY_DETECTORS = Setting
.intSetting("opendistro.anomaly_detection.max_anomaly_detectors", 1000, Setting.Property.NodeScope, Setting.Property.Dynamic);
.intSetting("ml.anomaly_detectors.max_anomaly_detectors", 1000, Setting.Property.NodeScope, Setting.Property.Dynamic);

public static final Setting<Integer> MAX_ANOMALY_FEATURES = Setting
.intSetting("opendistro.anomaly_detection.max_anomaly_features", 5, Setting.Property.NodeScope, Setting.Property.Dynamic);
.intSetting("ml.anomaly_detectors.max_anomaly_features", 5, Setting.Property.NodeScope, Setting.Property.Dynamic);

public static final Setting<TimeValue> REQUEST_TIMEOUT = Setting
.positiveTimeSetting(
"opendistro.anomaly_detection.request_timeout",
"ml.anomaly_detectors.request_timeout",
TimeValue.timeValueSeconds(10),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<TimeValue> DETECTION_INTERVAL = Setting
.positiveTimeSetting(
"opendistro.anomaly_detection.detection_interval",
"ml.anomaly_detectors.detection_interval",
TimeValue.timeValueMinutes(10),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<TimeValue> DETECTION_WINDOW_DELAY = Setting
.timeSetting(
"opendistro.anomaly_detection.detection_window_delay",
"ml.anomaly_detectors.detection_window_delay",
TimeValue.timeValueMinutes(0),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<TimeValue> AD_RESULT_ROLLOVER_PERIOD = Setting
.positiveTimeSetting(
"opendistro.anomaly_detection.ad_result_rollover_period",
"ml.anomaly_detectors.ad_result_rollover_period",
TimeValue.timeValueHours(12),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<TimeValue> AD_RESULT_HISTORY_ROLLOVER_PERIOD = Setting
.positiveTimeSetting(
"opendistro.anomaly_detection.ad_result_history_rollover_period",
"ml.anomaly_detectors.ad_result_history_rollover_period",
TimeValue.timeValueHours(12),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<TimeValue> AD_RESULT_HISTORY_INDEX_MAX_AGE = Setting
.positiveTimeSetting(
"opendistro.anomaly_detection.ad_result_history_max_age",
"ml.anomaly_detectors.ad_result_history_max_age",
TimeValue.timeValueHours(24),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<Long> AD_RESULT_HISTORY_MAX_DOCS = Setting
.longSetting("opendistro.anomaly_detection.ad_result_history_max_docs", 10000L, 0L,
Setting.Property.NodeScope, Setting.Property.Dynamic);
.longSetting("ml.anomaly_detectors.ad_result_history_max_docs", 10000L, 0L, Setting.Property.NodeScope, Setting.Property.Dynamic);

public static final Setting<Integer> MAX_RETRY_FOR_UNRESPONSIVE_NODE = Setting
.intSetting("opendistro.anomaly_detection.max_retry_for_unresponsive_node", 5, 0,
Setting.Property.NodeScope, Setting.Property.Dynamic);
.intSetting("ml.anomaly_detectors.max_retry_for_unresponsive_node", 5, 0, Setting.Property.NodeScope, Setting.Property.Dynamic);

public static final Setting<TimeValue> COOLDOWN_MINUTES = Setting
.positiveTimeSetting(
"opendistro.anomaly_detection.cooldown_minutes",
"ml.anomaly_detectors.cooldown_minutes",
TimeValue.timeValueMinutes(5),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<TimeValue> BACKOFF_MINUTES = Setting
.positiveTimeSetting(
"opendistro.anomaly_detection.backoff_minutes",
"ml.anomaly_detectors.backoff_minutes",
TimeValue.timeValueMinutes(15),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<TimeValue> BACKOFF_INITIAL_DELAY = Setting
.positiveTimeSetting(
"opendistro.anomaly_detection.backoff_initial_delay",
"ml.anomaly_detectors.backoff_initial_delay",
TimeValue.timeValueMillis(1000),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<Integer> MAX_RETRY_FOR_BACKOFF = Setting
.intSetting("opendistro.anomaly_detection.max_retry_for_backoff", 3, 0, Setting.Property.NodeScope, Setting.Property.Dynamic);
.intSetting("ml.anomaly_detectors.max_retry_for_backoff", 3, 0, Setting.Property.NodeScope, Setting.Property.Dynamic);

public static final String ANOMALY_DETECTORS_INDEX_MAPPING_FILE = "mappings/anomaly-detectors.json";
public static final String ANOMALY_RESULTS_INDEX_MAPPING_FILE = "mappings/anomaly-results.json";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,13 @@ public void addPressure(String nodeId) {
public void resetBackpressureCounter(String nodeId) {
backpressureMuter.remove(nodeId);
}

/**
* Check if there is running query on given detector
* @param detector Anomaly Detector
* @return boolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. The doc is incomplete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update

*/
public boolean hasRunningQuery(AnomalyDetector detector) {
return clientUtil.hasRunningQuery(detector);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ protected void doExecute(Task task, ActionRequest actionRequest, ActionListener<
return;
}
AnomalyDetector anomalyDetector = detector.get();
if (stateManager.hasRunningQuery(anomalyDetector)) {
LOG.info("There is one query running for detectorId: {}", anomalyDetector.getDetectorId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a query stuck for a long time, that will impact AD realtime detection. How about we cancel the stuck/running query and run the new coming query?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct. Canceling the stuck query whenever new query comes is one way. However it can be expensive since right now we can only use string compare(query.getDescription()) to find which query to kill. Also killing query using task management API is not effective instantly. It will only happen when moving to next segment.

listener.onFailure(new EndRunException(adID, "There is one query running on AnomalyDetector", true));
return;
}

String thresholdModelID = modelManager.getThresholdModelId(adID);
Optional<DiscoveryNode> thresholdNode = hashRing.getOwningNode(thresholdModelID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@

import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.REQUEST_TIMEOUT;

import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;

import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.Action;
Expand All @@ -42,11 +46,13 @@
public class ClientUtil {
private volatile TimeValue requestTimeout;
private Client client;
private final Throttler throttler;

@Inject
public ClientUtil(Settings setting, Client client) {
public ClientUtil(Settings setting, Client client, Throttler throttler) {
this.requestTimeout = REQUEST_TIMEOUT.get(setting);
this.client = client;
this.throttler = throttler;
}

/**
Expand Down Expand Up @@ -152,4 +158,62 @@ public <Request extends ActionRequest, Response extends ActionResponse> Response
) {
return function.apply(request).actionGet(requestTimeout);
}

/**
* Send a nonblocking request with a timeout and return response. The request will first be put into
* the negative cache. Once the request complete, it will be removed from the negative cache.
*
* @param request request like index/search/get
* @param LOG log
* @param consumer functional interface to operate as a client request like client::get
* @param <Request> ActionRequest
* @param <Response> ActionResponse
* @param detector Anomaly Detector
* @return the response
* @throws ElasticsearchTimeoutException when we cannot get response within time.
* @throws IllegalStateException when the waiting thread is interrupted
*/
public <Request extends ActionRequest, Response extends ActionResponse> Optional<Response> throttledTimedRequest(
Request request,
Logger LOG,
BiConsumer<Request, ActionListener<Response>> consumer,
AnomalyDetector detector
) {
try {
throttler.insertFilteredQuery(detector, request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should happen if the detector is already in cache?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't happen here since if a detector is already in cache, we will throw exception in AnomalyResultTransportAction

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a race condition when concurrent threads have finished the check at the higher level and try to execute queries. The method needs to be synchronized.

AtomicReference<Response> respReference = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);

consumer.accept(request, new LatchedActionListener<Response>(ActionListener.wrap(response -> {
// clear negative cache
throttler.clearFilteredQuery(detector);
respReference.set(response);
}, exception -> {
// clear negative cache
throttler.clearFilteredQuery(detector);
LOG.error("Cannot get response for request {}, error: {}", request, exception);
}), latch));

if (!latch.await(requestTimeout.getSeconds(), TimeUnit.SECONDS)) {
throw new ElasticsearchTimeoutException("Cannot get response within time limit: " + request.toString());
}
return Optional.ofNullable(respReference.get());
} catch (InterruptedException e1) {
LOG.error(CommonErrorMessages.WAIT_ERR_MSG);
throw new IllegalStateException(e1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If exception happens, should clear negative cache for this detector

}
}

/**
* Check if there is running query on given detector
* @param detector Anomaly Detector
* @return boolean
*/
public boolean hasRunningQuery(AnomalyDetector detector) {
Optional<Map.Entry<ActionRequest, Instant>> queryEntry = throttler.getFilteredQuery(detector);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor. The method can be simplified to one line. return throttler.getFilteredQuery(detector).isPresent()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update

if (queryEntry.isPresent()) {
return true;
}
return false;
}
}
Loading