diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java index ba49287d..c908776c 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java @@ -113,6 +113,10 @@ import com.amazon.opendistroforelasticsearch.ad.transport.RCFPollingTransportAction; import com.amazon.opendistroforelasticsearch.ad.transport.RCFResultAction; import com.amazon.opendistroforelasticsearch.ad.transport.RCFResultTransportAction; +import com.amazon.opendistroforelasticsearch.ad.transport.SearchAnomalyDetectorAction; +import com.amazon.opendistroforelasticsearch.ad.transport.SearchAnomalyDetectorTransportAction; +import com.amazon.opendistroforelasticsearch.ad.transport.SearchAnomalyResultAction; +import com.amazon.opendistroforelasticsearch.ad.transport.SearchAnomalyResultTransportAction; import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorAction; import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorTransportAction; import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultAction; @@ -460,7 +464,9 @@ public List getNamedXContent() { new ActionHandler<>(CronAction.INSTANCE, CronTransportAction.class), new ActionHandler<>(ADStatsNodesAction.INSTANCE, ADStatsNodesTransportAction.class), new ActionHandler<>(ProfileAction.INSTANCE, ProfileTransportAction.class), - new ActionHandler<>(RCFPollingAction.INSTANCE, RCFPollingTransportAction.class) + new ActionHandler<>(RCFPollingAction.INSTANCE, RCFPollingTransportAction.class), + new ActionHandler<>(SearchAnomalyDetectorAction.INSTANCE, SearchAnomalyDetectorTransportAction.class), + new ActionHandler<>(SearchAnomalyResultAction.INSTANCE, SearchAnomalyResultTransportAction.class) ); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/AbstractSearchAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/AbstractSearchAction.java index 08c3e4b3..cb4d67fd 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/AbstractSearchAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/AbstractSearchAction.java @@ -25,6 +25,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionType; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.node.NodeClient; @@ -57,13 +58,15 @@ public abstract class AbstractSearchAction extends B private final String index; private final Class clazz; private final String urlPath; + private final ActionType actionType; private final Logger logger = LogManager.getLogger(AbstractSearchAction.class); - public AbstractSearchAction(String urlPath, String index, Class clazz) { + public AbstractSearchAction(String urlPath, String index, Class clazz, ActionType actionType) { this.index = index; this.clazz = clazz; this.urlPath = urlPath; + this.actionType = actionType; } @Override @@ -76,10 +79,10 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli searchSourceBuilder.fetchSource(getSourceContext(request)); searchSourceBuilder.seqNoAndPrimaryTerm(true).version(true); SearchRequest searchRequest = new SearchRequest().source(searchSourceBuilder).indices(this.index); - return channel -> client.search(searchRequest, search(channel, this.clazz)); + return channel -> client.execute(actionType, searchRequest, search(channel)); } - private RestResponseListener search(RestChannel channel, Class clazz) { + private RestResponseListener search(RestChannel channel) { return new RestResponseListener(channel) { @Override public RestResponse buildResponse(SearchResponse response) throws Exception { diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestSearchAnomalyDetectorAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestSearchAnomalyDetectorAction.java index f71f4705..9c9b8a1b 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestSearchAnomalyDetectorAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestSearchAnomalyDetectorAction.java @@ -19,6 +19,7 @@ import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.transport.SearchAnomalyDetectorAction; /** * This class consists of the REST handler to search anomaly detectors. @@ -29,7 +30,7 @@ public class RestSearchAnomalyDetectorAction extends AbstractSearchAction { + public static final SearchAnomalyDetectorAction INSTANCE = new SearchAnomalyDetectorAction(); + public static final String NAME = "cluster:admin/opendistro/ad/detector/search"; + + private SearchAnomalyDetectorAction() { + super(NAME, SearchResponse::new); + } +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyDetectorTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyDetectorTransportAction.java new file mode 100644 index 00000000..687fc447 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyDetectorTransportAction.java @@ -0,0 +1,52 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; + +public class SearchAnomalyDetectorTransportAction extends HandledTransportAction { + + private final Client client; + + @Inject + public SearchAnomalyDetectorTransportAction(TransportService transportService, ActionFilters actionFilters, Client client) { + super(SearchAnomalyDetectorAction.NAME, transportService, actionFilters, SearchRequest::new); + this.client = client; + } + + @Override + protected void doExecute(Task task, SearchRequest request, ActionListener listener) { + client.search(request, new ActionListener() { + @Override + public void onResponse(SearchResponse searchResponse) { + listener.onResponse(searchResponse); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyResultAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyResultAction.java new file mode 100644 index 00000000..cd8b83c0 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyResultAction.java @@ -0,0 +1,28 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.search.SearchResponse; + +public class SearchAnomalyResultAction extends ActionType { + public static final SearchAnomalyResultAction INSTANCE = new SearchAnomalyResultAction(); + public static final String NAME = "cluster:admin/opendistro/ad/result/search"; + + private SearchAnomalyResultAction() { + super(NAME, SearchResponse::new); + } +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyResultTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyResultTransportAction.java new file mode 100644 index 00000000..5a777895 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyResultTransportAction.java @@ -0,0 +1,52 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; + +public class SearchAnomalyResultTransportAction extends HandledTransportAction { + + private final Client client; + + @Inject + public SearchAnomalyResultTransportAction(TransportService transportService, ActionFilters actionFilters, Client client) { + super(SearchAnomalyResultAction.NAME, transportService, actionFilters, SearchRequest::new); + this.client = client; + } + + @Override + protected void doExecute(Task task, SearchRequest request, ActionListener listener) { + client.search(request, new ActionListener() { + @Override + public void onResponse(SearchResponse searchResponse) { + listener.onResponse(searchResponse); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyDetectorActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyDetectorActionTests.java new file mode 100644 index 00000000..d547d3d7 --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyDetectorActionTests.java @@ -0,0 +1,81 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import static org.mockito.Mockito.mock; + +import org.apache.lucene.index.IndexNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.transport.TransportService; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class SearchAnomalyDetectorActionTests extends ESIntegTestCase { + private SearchAnomalyDetectorTransportAction action; + private Task task; + private ActionListener response; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + action = new SearchAnomalyDetectorTransportAction(mock(TransportService.class), mock(ActionFilters.class), client()); + task = mock(Task.class); + response = new ActionListener() { + @Override + public void onResponse(SearchResponse searchResponse) { + Assert.assertEquals(searchResponse.getSuccessfulShards(), 5); + } + + @Override + public void onFailure(Exception e) { + Assert.assertFalse(IndexNotFoundException.class == e.getClass()); + } + }; + } + + @Test + public void testSearchResponse() { + // Will call response.onResponse as Index exists + Settings indexSettings = Settings.builder().put("number_of_shards", 5).put("number_of_replicas", 1).build(); + CreateIndexRequest indexRequest = new CreateIndexRequest("my-test-index", indexSettings); + client().admin().indices().create(indexRequest).actionGet(); + SearchRequest searchRequest = new SearchRequest("my-test-index"); + action.doExecute(task, searchRequest, response); + } + + @Test + public void testSearchDetectorAction() { + Assert.assertNotNull(SearchAnomalyDetectorAction.INSTANCE.name()); + Assert.assertEquals(SearchAnomalyDetectorAction.INSTANCE.name(), SearchAnomalyDetectorAction.NAME); + } + + @Test + public void testNoIndex() { + // No Index, will call response.onFailure + SearchRequest searchRequest = new SearchRequest("my-test-index"); + action.doExecute(task, searchRequest, response); + } +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyResultActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyResultActionTests.java new file mode 100644 index 00000000..9e2e042a --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyResultActionTests.java @@ -0,0 +1,80 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import static org.mockito.Mockito.mock; + +import org.apache.lucene.index.IndexNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.transport.TransportService; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class SearchAnomalyResultActionTests extends ESIntegTestCase { + private SearchAnomalyResultTransportAction action; + private Task task; + private ActionListener response; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + action = new SearchAnomalyResultTransportAction(mock(TransportService.class), mock(ActionFilters.class), client()); + task = mock(Task.class); + response = new ActionListener() { + @Override + public void onResponse(SearchResponse searchResponse) { + Assert.assertEquals(searchResponse.getSuccessfulShards(), 5); + } + + @Override + public void onFailure(Exception e) { + Assert.assertFalse(IndexNotFoundException.class == e.getClass()); + } + }; + } + + @Test + public void testSearchResponse() { + // Will call response.onResponse as Index exists + Settings indexSettings = Settings.builder().put("number_of_shards", 5).put("number_of_replicas", 1).build(); + CreateIndexRequest indexRequest = new CreateIndexRequest("my-test-index", indexSettings); + client().admin().indices().create(indexRequest).actionGet(); + SearchRequest searchRequest = new SearchRequest("my-test-index"); + action.doExecute(task, searchRequest, response); + } + + @Test + public void testSearchResultAction() { + Assert.assertNotNull(SearchAnomalyResultAction.INSTANCE.name()); + Assert.assertEquals(SearchAnomalyResultAction.INSTANCE.name(), SearchAnomalyResultAction.NAME); + } + + @Test + public void testNoIndex() { + // No Index, will call response.onFailure + SearchRequest searchRequest = new SearchRequest("my-test-index"); + action.doExecute(task, searchRequest, response); + } +}