Skip to content

Commit

Permalink
Create and delete pit rest layer changes
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
  • Loading branch information
bharath-techie committed Aug 2, 2022
1 parent bea5d1a commit 49b3f01
Show file tree
Hide file tree
Showing 14 changed files with 867 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import org.opensearch.action.get.MultiGetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchScrollRequest;
Expand Down Expand Up @@ -92,6 +94,7 @@
import org.opensearch.index.reindex.ReindexRequest;
import org.opensearch.index.reindex.UpdateByQueryRequest;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.rest.action.search.RestCreatePitAction;
import org.opensearch.rest.action.search.RestSearchAction;
import org.opensearch.script.mustache.MultiSearchTemplateRequest;
import org.opensearch.script.mustache.SearchTemplateRequest;
Expand Down Expand Up @@ -433,9 +436,16 @@ static void addSearchRequestParams(Params params, SearchRequest searchRequest) {
params.putParam(RestSearchAction.TYPED_KEYS_PARAM, "true");
params.withRouting(searchRequest.routing());
params.withPreference(searchRequest.preference());
if (searchRequest.pointInTimeBuilder() == null) {
params.withIndicesOptions(searchRequest.indicesOptions());
}
params.withIndicesOptions(searchRequest.indicesOptions());
params.withSearchType(searchRequest.searchType().name().toLowerCase(Locale.ROOT));
params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
if (searchRequest.pointInTimeBuilder() != null) {
params.putParam("ccs_minimize_roundtrips", "false");
} else {
params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
}
if (searchRequest.getPreFilterShardSize() != null) {
params.putParam("pre_filter_shard_size", Integer.toString(searchRequest.getPreFilterShardSize()));
}
Expand Down Expand Up @@ -464,6 +474,28 @@ static Request clearScroll(ClearScrollRequest clearScrollRequest) throws IOExcep
return request;
}

static Request createPit(CreatePitRequest createPitRequest) throws IOException {
Params params = new Params();
params.putParam(RestCreatePitAction.ALLOW_PARTIAL_PIT_CREATION, Boolean.toString(createPitRequest.shouldAllowPartialPitCreation()));
params.putParam(RestCreatePitAction.KEEP_ALIVE, createPitRequest.getKeepAlive());
params.withIndicesOptions(createPitRequest.indicesOptions());
Request request = new Request(HttpPost.METHOD_NAME, endpoint(createPitRequest.indices(), "_search/point_in_time"));
request.addParameters(params.asMap());
request.setEntity(createEntity(createPitRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}

static Request deletePit(DeletePitRequest deletePitRequest) throws IOException {
Request request = new Request(HttpDelete.METHOD_NAME, "/_search/point_in_time");
request.setEntity(createEntity(deletePitRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}

static Request deleteAllPits(DeletePitRequest deletePitRequest) {
Request request = new Request(HttpDelete.METHOD_NAME, "/_search/point_in_time/_all");
return request;
}

static Request multiSearch(MultiSearchRequest multiSearchRequest) throws IOException {
Request request = new Request(HttpPost.METHOD_NAME, "/_msearch");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
Expand Down Expand Up @@ -1256,6 +1260,126 @@ public final Cancellable scrollAsync(
);
}

/**
* Create PIT context using create PIT API
*
* @param createPitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
*/
public final CreatePitResponse createPit(CreatePitRequest createPitRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(
createPitRequest,
RequestConverters::createPit,
options,
CreatePitResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously Create PIT context using create PIT API
*
* @param createPitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return the response
*/
public final Cancellable createPitAsync(
CreatePitRequest createPitRequest,
RequestOptions options,
ActionListener<CreatePitResponse> listener
) {
return performRequestAsyncAndParseEntity(
createPitRequest,
RequestConverters::createPit,
options,
CreatePitResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Delete point in time searches using delete PIT API
*
* @param deletePitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
*/
public final DeletePitResponse deletePit(DeletePitRequest deletePitRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(
deletePitRequest,
RequestConverters::deletePit,
options,
DeletePitResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously Delete point in time searches using delete PIT API
*
* @param deletePitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return the response
*/
public final Cancellable deletePitAsync(
DeletePitRequest deletePitRequest,
RequestOptions options,
ActionListener<DeletePitResponse> listener
) {
return performRequestAsyncAndParseEntity(
deletePitRequest,
RequestConverters::deletePit,
options,
DeletePitResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Delete all point in time searches using delete all PITs API
*
* @param deletePitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
*/
public final DeletePitResponse deleteAllPits(DeletePitRequest deletePitRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(
deletePitRequest,
RequestConverters::deleteAllPits,
options,
DeletePitResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously Delete all point in time searches using delete all PITs API
*
* @param deletePitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return the response
*/
public final Cancellable deleteAllPitsAsync(
DeletePitRequest deletePitRequest,
RequestOptions options,
ActionListener<DeletePitResponse> listener
) {
return performRequestAsyncAndParseEntity(
deletePitRequest,
RequestConverters::deleteAllPits,
options,
DeletePitResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Clears one or more scroll ids using the Clear Scroll API.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client;

import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.junit.Before;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitInfo;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.common.unit.TimeValue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* Tests point in time API with rest high level client
*/
public class PitIT extends OpenSearchRestHighLevelClientTestCase {

@Before
public void indexDocuments() throws IOException {
Request doc1 = new Request(HttpPut.METHOD_NAME, "/index/_doc/1");
doc1.setJsonEntity("{\"type\":\"type1\", \"id\":1, \"num\":10, \"num2\":50}");
client().performRequest(doc1);
Request doc2 = new Request(HttpPut.METHOD_NAME, "/index/_doc/2");
doc2.setJsonEntity("{\"type\":\"type1\", \"id\":2, \"num\":20, \"num2\":40}");
client().performRequest(doc2);
Request doc3 = new Request(HttpPut.METHOD_NAME, "/index/_doc/3");
doc3.setJsonEntity("{\"type\":\"type1\", \"id\":3, \"num\":50, \"num2\":35}");
client().performRequest(doc3);
Request doc4 = new Request(HttpPut.METHOD_NAME, "/index/_doc/4");
doc4.setJsonEntity("{\"type\":\"type2\", \"id\":4, \"num\":100, \"num2\":10}");
client().performRequest(doc4);
Request doc5 = new Request(HttpPut.METHOD_NAME, "/index/_doc/5");
doc5.setJsonEntity("{\"type\":\"type2\", \"id\":5, \"num\":100, \"num2\":10}");
client().performRequest(doc5);
client().performRequest(new Request(HttpPost.METHOD_NAME, "/_refresh"));
}

public void testCreatePit() throws IOException {
CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index");
CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
assertTrue(pitResponse.getId() != null);
assertEquals(1, pitResponse.getTotalShards());
assertEquals(1, pitResponse.getSuccessfulShards());
assertEquals(0, pitResponse.getFailedShards());
assertEquals(0, pitResponse.getSkippedShards());
List<String> pitIds = new ArrayList<>();
pitIds.add(pitResponse.getId());
DeletePitRequest deletePitRequest = new DeletePitRequest(pitIds);
DeletePitResponse deletePitResponse = execute(deletePitRequest, highLevelClient()::deletePit, highLevelClient()::deletePitAsync);
assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful());
assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(pitResponse.getId()));
}

public void testDeleteAllPits() throws IOException {
CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index");
CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
CreatePitResponse pitResponse1 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
assertTrue(pitResponse.getId() != null);
assertTrue(pitResponse1.getId() != null);
List<String> pitIds = new ArrayList<>();
pitIds.add("_all");
DeletePitRequest deletePitRequest = new DeletePitRequest(pitIds);
DeletePitResponse deletePitResponse = execute(
deletePitRequest,
highLevelClient()::deleteAllPits,
highLevelClient()::deleteAllPitsAsync
);
for (DeletePitInfo deletePitInfo : deletePitResponse.getDeletePitResults()) {
assertTrue(deletePitInfo.isSuccessful());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.action.get.MultiGetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchScrollRequest;
Expand Down Expand Up @@ -131,6 +132,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -1303,6 +1305,27 @@ public void testClearScroll() throws IOException {
assertEquals(REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue());
}

public void testCreatePit() throws IOException {
String[] indices = randomIndicesNames(0, 5);
Map<String, String> expectedParams = new HashMap<>();
expectedParams.put("keep_alive", "1d");
expectedParams.put("allow_partial_pit_creation", "true");
CreatePitRequest createPitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, indices);
setRandomIndicesOptions(createPitRequest::indicesOptions, createPitRequest::indicesOptions, expectedParams);
Request request = RequestConverters.createPit(createPitRequest);
StringJoiner endpoint = new StringJoiner("/", "/", "");
String index = String.join(",", indices);
if (Strings.hasLength(index)) {
endpoint.add(index);
}
endpoint.add("_search/point_in_time");
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals(endpoint.toString(), request.getEndpoint());
assertEquals(expectedParams, request.getParameters());
assertToXContentBody(createPitRequest, request.getEntity());
assertEquals(REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue());
}

public void testSearchTemplate() throws Exception {
// Create a random request.
String[] indices = randomIndicesNames(0, 5);
Expand Down
Loading

0 comments on commit 49b3f01

Please sign in to comment.