Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Adding RestActions support for Detector Stats API (#237)
Browse files Browse the repository at this point in the history
* Adding RestActions support for detector stats

* Update SearchAnomalyDetectorAction.java

* Adding Unit Tests
  • Loading branch information
saratvemulapalli authored Sep 30, 2020
1 parent 71923ff commit 4c52b23
Show file tree
Hide file tree
Showing 8 changed files with 351 additions and 133 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ List<String> jacocoExclusions = [
'com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorRunner',
'com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices',
'com.amazon.opendistroforelasticsearch.ad.util.ParseUtils',
'com.amazon.opendistroforelasticsearch.ad.transport.StatsAnomalyDetectorTransportAction'
]

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@
import com.amazon.opendistroforelasticsearch.ad.transport.SearchAnomalyDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.SearchAnomalyResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.SearchAnomalyResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.StatsAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.StatsAnomalyDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultAction;
Expand Down Expand Up @@ -221,11 +223,7 @@ public List<RestHandler> getRestHandlers(
clusterService,
anomalyDetectorRunner
);
RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction(
adStats,
this.nodeFilter,
this.clusterService
);
RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction(adStats, this.nodeFilter);
RestAnomalyDetectorJobAction anomalyDetectorJobAction = new RestAnomalyDetectorJobAction(
settings,
clusterService,
Expand Down Expand Up @@ -466,7 +464,8 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new ActionHandler<>(ProfileAction.INSTANCE, ProfileTransportAction.class),
new ActionHandler<>(RCFPollingAction.INSTANCE, RCFPollingTransportAction.class),
new ActionHandler<>(SearchAnomalyDetectorAction.INSTANCE, SearchAnomalyDetectorTransportAction.class),
new ActionHandler<>(SearchAnomalyResultAction.INSTANCE, SearchAnomalyResultTransportAction.class)
new ActionHandler<>(SearchAnomalyResultAction.INSTANCE, SearchAnomalyResultTransportAction.class),
new ActionHandler<>(StatsAnomalyDetectorAction.INSTANCE, StatsAnomalyDetectorTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,25 @@
import static com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin.AD_BASE_URI;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestToXContentListener;

import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting;
import com.amazon.opendistroforelasticsearch.ad.stats.ADStats;
import com.amazon.opendistroforelasticsearch.ad.stats.ADStatsResponse;
import com.amazon.opendistroforelasticsearch.ad.stats.StatNames;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsRequest;
import com.amazon.opendistroforelasticsearch.ad.transport.StatsAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;
import com.amazon.opendistroforelasticsearch.ad.util.MultiResponsesDelegateActionListener;
import com.google.common.collect.ImmutableList;

/**
Expand All @@ -65,12 +54,10 @@ public class RestStatsAnomalyDetectorAction extends BaseRestHandler {
*
* @param adStats ADStats object
* @param nodeFilter util class to get eligible data nodes
* @param clusterService ClusterService
*/
public RestStatsAnomalyDetectorAction(ADStats adStats, DiscoveryNodeFilterer nodeFilter, ClusterService clusterService) {
public RestStatsAnomalyDetectorAction(ADStats adStats, DiscoveryNodeFilterer nodeFilter) {
this.adStats = adStats;
this.nodeFilter = nodeFilter;
this.clusterService = clusterService;
}

@Override
Expand All @@ -84,7 +71,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
throw new IllegalStateException(CommonErrorMessages.DISABLED_ERR_MSG);
}
ADStatsRequest adStatsRequest = getRequest(request);
return channel -> getStats(client, channel, adStatsRequest);
return channel -> client.execute(StatsAnomalyDetectorAction.INSTANCE, adStatsRequest, new RestToXContentListener<>(channel));
}

/**
Expand Down Expand Up @@ -141,116 +128,6 @@ private ADStatsRequest getRequest(RestRequest request) {
return adStatsRequest;
}

/**
* Make the 2 requests to get the node and cluster statistics
*
* @param client Client
* @param channel Channel to send response
* @param adStatsRequest Request containing stats to be retrieved
*/
private void getStats(Client client, RestChannel channel, ADStatsRequest adStatsRequest) {
// Use MultiResponsesDelegateActionListener to execute 2 async requests and create the response once they finish
MultiResponsesDelegateActionListener<ADStatsResponse> delegateListener = new MultiResponsesDelegateActionListener<>(
getRestStatsListener(channel),
2,
"Unable to return AD Stats"
);

getClusterStats(client, delegateListener, adStatsRequest);
getNodeStats(client, delegateListener, adStatsRequest);
}

/**
* Make async request to get the number of detectors in AnomalyDetector.ANOMALY_DETECTORS_INDEX if necessary
* and, onResponse, gather the cluster statistics
*
* @param client Client
* @param listener MultiResponsesDelegateActionListener to be used once both requests complete
* @param adStatsRequest Request containing stats to be retrieved
*/
private void getClusterStats(
Client client,
MultiResponsesDelegateActionListener<ADStatsResponse> listener,
ADStatsRequest adStatsRequest
) {
ADStatsResponse adStatsResponse = new ADStatsResponse();
if (adStatsRequest.getStatsToBeRetrieved().contains(StatNames.DETECTOR_COUNT.getName())) {
if (clusterService.state().getRoutingTable().hasIndex(AnomalyDetector.ANOMALY_DETECTORS_INDEX)) {
final SearchRequest request = client
.prepareSearch(AnomalyDetector.ANOMALY_DETECTORS_INDEX)
.setSize(0)
.setTrackTotalHits(true)
.request();
client.search(request, ActionListener.wrap(indicesStatsResponse -> {
adStats.getStat(StatNames.DETECTOR_COUNT.getName()).setValue(indicesStatsResponse.getHits().getTotalHits().value);
adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest));
listener.onResponse(adStatsResponse);
}, e -> listener.onFailure(new RuntimeException("Failed to get AD cluster stats", e))));
} else {
adStats.getStat(StatNames.DETECTOR_COUNT.getName()).setValue(0L);
adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest));
listener.onResponse(adStatsResponse);
}
} else {
adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest));
listener.onResponse(adStatsResponse);
}
}

/**
* Make async request to get the Anomaly Detection statistics from each node and, onResponse, set the
* ADStatsNodesResponse field of ADStatsResponse
*
* @param client Client
* @param listener MultiResponsesDelegateActionListener to be used once both requests complete
* @param adStatsRequest Request containing stats to be retrieved
*/
private void getNodeStats(
Client client,
MultiResponsesDelegateActionListener<ADStatsResponse> listener,
ADStatsRequest adStatsRequest
) {
client.execute(ADStatsNodesAction.INSTANCE, adStatsRequest, ActionListener.wrap(adStatsResponse -> {
ADStatsResponse restADStatsResponse = new ADStatsResponse();
restADStatsResponse.setADStatsNodesResponse(adStatsResponse);
listener.onResponse(restADStatsResponse);
}, listener::onFailure));
}

/**
* Collect Cluster Stats into map to be retrieved
*
* @param adStatsRequest Request containing stats to be retrieved
* @return Map containing Cluster Stats
*/
private Map<String, Object> getClusterStatsMap(ADStatsRequest adStatsRequest) {
Map<String, Object> clusterStats = new HashMap<>();
Set<String> statsToBeRetrieved = adStatsRequest.getStatsToBeRetrieved();
adStats
.getClusterStats()
.entrySet()
.stream()
.filter(s -> statsToBeRetrieved.contains(s.getKey()))
.forEach(s -> clusterStats.put(s.getKey(), s.getValue().getValue()));
return clusterStats;
}

/**
* Listener sends response once Node Stats and Cluster Stats are gathered
*
* @param channel Channel
* @return ActionListener for ADStatsResponse
*/
private ActionListener<ADStatsResponse> getRestStatsListener(RestChannel channel) {
return ActionListener
.wrap(
adStatsResponse -> {
channel.sendResponse(new BytesRestResponse(RestStatus.OK, adStatsResponse.toXContent(channel.newBuilder())));
},
exception -> channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, exception.getMessage()))
);
}

@Override
public List<Route> routes() {
return ImmutableList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -82,6 +84,18 @@ public XContentBuilder toXContent(XContentBuilder builder) throws IOException {
return toXContent(builder, ToXContent.EMPTY_PARAMS);
}

public ADStatsResponse() {}

public ADStatsResponse(StreamInput in) throws IOException {
adStatsNodesResponse = new ADStatsNodesResponse(in);
clusterStats = in.readMap();
}

public void writeTo(StreamOutput out) throws IOException {
adStatsNodesResponse.writeTo(out);
out.writeMap(clusterStats);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.transport;

import org.elasticsearch.action.ActionType;

public class StatsAnomalyDetectorAction extends ActionType<StatsAnomalyDetectorResponse> {
public static final StatsAnomalyDetectorAction INSTANCE = new StatsAnomalyDetectorAction();
public static final String NAME = "cluster:admin/opendistro/ad/detector/stats";

private StatsAnomalyDetectorAction() {
super(NAME, StatsAnomalyDetectorResponse::new);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.transport;

import java.io.IOException;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

import com.amazon.opendistroforelasticsearch.ad.stats.ADStatsResponse;

public class StatsAnomalyDetectorResponse extends ActionResponse implements ToXContentObject {
private ADStatsResponse adStatsResponse;

public StatsAnomalyDetectorResponse(StreamInput in) throws IOException {
super(in);
adStatsResponse = new ADStatsResponse(in);
}

public StatsAnomalyDetectorResponse(ADStatsResponse adStatsResponse) {
this.adStatsResponse = adStatsResponse;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
adStatsResponse.writeTo(out);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
adStatsResponse.toXContent(builder, params);
return builder;
}
}
Loading

0 comments on commit 4c52b23

Please sign in to comment.