Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Adding RestActions support Create/Update Detector API #243

Merged
merged 9 commits into from
Oct 13, 2020
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ List<String> jacocoExclusions = [
'com.amazon.opendistroforelasticsearch.ad.transport.StatsAnomalyDetectorTransportAction',
'com.amazon.opendistroforelasticsearch.ad.transport.DeleteAnomalyDetectorTransportAction*',
'com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorTransportAction*',
'com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorResponse'
'com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorResponse',
'com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorRequest'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added IndexAnomalyDetectorRequest using Powermock but JaCoCo doesnt read lines covered by Powermock tests.

]

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@
import com.amazon.opendistroforelasticsearch.ad.transport.DeleteModelTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.RCFPollingAction;
Expand Down Expand Up @@ -471,7 +473,8 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new ActionHandler<>(SearchAnomalyResultAction.INSTANCE, SearchAnomalyResultTransportAction.class),
new ActionHandler<>(StatsAnomalyDetectorAction.INSTANCE, StatsAnomalyDetectorTransportAction.class),
new ActionHandler<>(DeleteAnomalyDetectorAction.INSTANCE, DeleteAnomalyDetectorTransportAction.class),
new ActionHandler<>(GetAnomalyDetectorAction.INSTANCE, GetAnomalyDetectorTransportAction.class)
new ActionHandler<>(GetAnomalyDetectorAction.INSTANCE, GetAnomalyDetectorTransportAction.class),
new ActionHandler<>(IndexAnomalyDetectorAction.INSTANCE, IndexAnomalyDetectorTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,25 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestResponseListener;

import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;
import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.rest.handler.IndexAnomalyDetectorActionHandler;
import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting;
import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorRequest;
import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorResponse;
import com.google.common.collect.ImmutableList;

/**
Expand Down Expand Up @@ -115,22 +123,19 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
WriteRequest.RefreshPolicy refreshPolicy = request.hasParam(REFRESH)
? WriteRequest.RefreshPolicy.parse(request.param(REFRESH))
: WriteRequest.RefreshPolicy.IMMEDIATE;
RestRequest.Method method = request.getHttpRequest().method();

return channel -> new IndexAnomalyDetectorActionHandler(
settings,
clusterService,
client,
channel,
anomalyDetectionIndices,
IndexAnomalyDetectorRequest indexAnomalyDetectorRequest = new IndexAnomalyDetectorRequest(
detectorId,
seqNo,
primaryTerm,
refreshPolicy,
detector,
requestTimeout,
maxAnomalyDetectors,
maxAnomalyFeatures
).start();
method
);

return channel -> client
.execute(IndexAnomalyDetectorAction.INSTANCE, indexAnomalyDetectorRequest, indexAnomalyDetectorResponse(channel, method));
}

@Override
Expand All @@ -146,4 +151,28 @@ public List<Route> routes() {
)
);
}

private RestResponseListener<IndexAnomalyDetectorResponse> indexAnomalyDetectorResponse(
RestChannel channel,
RestRequest.Method method
) {
return new RestResponseListener<IndexAnomalyDetectorResponse>(channel) {
@Override
public RestResponse buildResponse(IndexAnomalyDetectorResponse response) throws Exception {
RestStatus restStatus = RestStatus.CREATED;
if (method == RestRequest.Method.PUT) {
restStatus = RestStatus.OK;
}
BytesRestResponse bytesRestResponse = new BytesRestResponse(
restStatus,
response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS)
);
if (restStatus == RestStatus.CREATED) {
String location = String.format(Locale.ROOT, "%s/%s", AnomalyDetectorPlugin.AD_BASE_URI, response.getId());
bytesRestResponse.addHeader("Location", location);
}
return bytesRestResponse;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestStatus;

import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
Expand All @@ -49,47 +49,58 @@ public class AnomalyDetectorActionHandler {
* @param clusterService ES cluster service
* @param client ES node client
* @param detectorId detector identifier
* @param channel ES rest channel
* @param listener Listener to send response
* @param function AD function
* @param xContentRegistry Registry which is used for XContentParser
*/
saratvemulapalli marked this conversation as resolved.
Show resolved Hide resolved
public void getDetectorJob(
ClusterService clusterService,
NodeClient client,
Client client,
String detectorId,
RestChannel channel,
AnomalyDetectorFunction function
ActionListener listener,
AnomalyDetectorFunction function,
NamedXContentRegistry xContentRegistry
) {
if (clusterService.state().metadata().indices().containsKey(ANOMALY_DETECTOR_JOB_INDEX)) {
GetRequest request = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX).id(detectorId);
client.get(request, ActionListener.wrap(response -> onGetAdJobResponseForWrite(response, channel, function), exception -> {
logger.error("Fail to get anomaly detector job: " + detectorId, exception);
try {
channel.sendResponse(new BytesRestResponse(channel, exception));
} catch (IOException e) {
logger.error("Fail to send exception" + detectorId, e);
}
}));
client
.get(
request,
ActionListener
.wrap(response -> onGetAdJobResponseForWrite(response, listener, function, xContentRegistry), exception -> {
logger.error("Fail to get anomaly detector job: " + detectorId, exception);
listener.onFailure(exception);
})
);
} else {
function.execute();
}
}

private void onGetAdJobResponseForWrite(GetResponse response, RestChannel channel, AnomalyDetectorFunction function) {
private void onGetAdJobResponseForWrite(
GetResponse response,
ActionListener listener,
AnomalyDetectorFunction function,
NamedXContentRegistry xContentRegistry
) {
if (response.isExists()) {
String adJobId = response.getId();
if (adJobId != null) {
// check if AD job is running on the detector, if yes, we can't delete the detector
try (XContentParser parser = RestHandlerUtils.createXContentParser(channel, response.getSourceAsBytesRef())) {
try (
XContentParser parser = RestHandlerUtils
.createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
AnomalyDetectorJob adJob = AnomalyDetectorJob.parse(parser);
if (adJob.isEnabled()) {
channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, "Detector job is running: " + adJobId));
listener.onFailure(new ElasticsearchStatusException("Detector job is running: " + adJobId, RestStatus.BAD_REQUEST));
return;
}
} catch (IOException e) {
String message = "Failed to parse anomaly detector job " + adJobId;
logger.error(message, e);
channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, message));
listener.onFailure(new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST));
}
}
}
Expand Down
Loading