Skip to content

Commit 8c9b90a

Browse files
sohaibiftikharnik9000
authored andcommitted
HLREST: add update by query API (#32760)
Adds update by query to the high level rest client.
1 parent bc8dcd6 commit 8c9b90a

File tree

19 files changed

+691
-81
lines changed

19 files changed

+691
-81
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,9 @@
107107
import org.elasticsearch.common.xcontent.XContentType;
108108
import org.elasticsearch.index.VersionType;
109109
import org.elasticsearch.index.rankeval.RankEvalRequest;
110+
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
110111
import org.elasticsearch.index.reindex.ReindexRequest;
112+
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
111113
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
112114
import org.elasticsearch.protocol.xpack.XPackUsageRequest;
113115
import org.elasticsearch.protocol.xpack.license.DeleteLicenseRequest;
@@ -848,6 +850,33 @@ static Request reindex(ReindexRequest reindexRequest) throws IOException {
848850
return request;
849851
}
850852

853+
static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws IOException {
854+
String endpoint =
855+
endpoint(updateByQueryRequest.indices(), updateByQueryRequest.getDocTypes(), "_update_by_query");
856+
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
857+
Params params = new Params(request)
858+
.withRouting(updateByQueryRequest.getRouting())
859+
.withPipeline(updateByQueryRequest.getPipeline())
860+
.withRefresh(updateByQueryRequest.isRefresh())
861+
.withTimeout(updateByQueryRequest.getTimeout())
862+
.withWaitForActiveShards(updateByQueryRequest.getWaitForActiveShards(), ActiveShardCount.DEFAULT)
863+
.withIndicesOptions(updateByQueryRequest.indicesOptions());
864+
if (updateByQueryRequest.isAbortOnVersionConflict() == false) {
865+
params.putParam("conflicts", "proceed");
866+
}
867+
if (updateByQueryRequest.getBatchSize() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE) {
868+
params.putParam("scroll_size", Integer.toString(updateByQueryRequest.getBatchSize()));
869+
}
870+
if (updateByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) {
871+
params.putParam("scroll", updateByQueryRequest.getScrollTime());
872+
}
873+
if (updateByQueryRequest.getSize() > 0) {
874+
params.putParam("size", Integer.toString(updateByQueryRequest.getSize()));
875+
}
876+
request.setEntity(createEntity(updateByQueryRequest, REQUEST_BODY_CONTENT_TYPE));
877+
return request;
878+
}
879+
851880
static Request rollover(RolloverRequest rolloverRequest) throws IOException {
852881
String endpoint = new EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover")
853882
.addPathPart(rolloverRequest.getNewIndexName()).build();

client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.elasticsearch.index.rankeval.RankEvalResponse;
6868
import org.elasticsearch.index.reindex.BulkByScrollResponse;
6969
import org.elasticsearch.index.reindex.ReindexRequest;
70+
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
7071
import org.elasticsearch.plugins.spi.NamedXContentProvider;
7172
import org.elasticsearch.rest.BytesRestResponse;
7273
import org.elasticsearch.rest.RestStatus;
@@ -444,6 +445,35 @@ public final void reindexAsync(ReindexRequest reindexRequest, RequestOptions opt
444445
);
445446
}
446447

448+
/**
449+
* Executes a update by query request.
450+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">
451+
* Update By Query API on elastic.co</a>
452+
* @param updateByQueryRequest the request
453+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
454+
* @return the response
455+
* @throws IOException in case there is a problem sending the request or parsing back the response
456+
*/
457+
public final BulkByScrollResponse updateByQuery(UpdateByQueryRequest updateByQueryRequest, RequestOptions options) throws IOException {
458+
return performRequestAndParseEntity(
459+
updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, emptySet()
460+
);
461+
}
462+
463+
/**
464+
* Asynchronously executes an update by query request.
465+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">
466+
* Update By Query API on elastic.co</a>
467+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
468+
* @param listener the listener to be notified upon request completion
469+
*/
470+
public final void updateByQueryAsync(UpdateByQueryRequest reindexRequest, RequestOptions options,
471+
ActionListener<BulkByScrollResponse> listener) {
472+
performRequestAsyncAndParseEntity(
473+
reindexRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
474+
);
475+
}
476+
447477
/**
448478
* Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise
449479
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized

client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.elasticsearch.index.query.IdsQueryBuilder;
5252
import org.elasticsearch.index.reindex.BulkByScrollResponse;
5353
import org.elasticsearch.index.reindex.ReindexRequest;
54+
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
5455
import org.elasticsearch.rest.RestStatus;
5556
import org.elasticsearch.script.Script;
5657
import org.elasticsearch.script.ScriptType;
@@ -756,6 +757,72 @@ public void testReindex() throws IOException {
756757
}
757758
}
758759

760+
public void testUpdateByQuery() throws IOException {
761+
final String sourceIndex = "source1";
762+
{
763+
// Prepare
764+
Settings settings = Settings.builder()
765+
.put("number_of_shards", 1)
766+
.put("number_of_replicas", 0)
767+
.build();
768+
createIndex(sourceIndex, settings);
769+
assertEquals(
770+
RestStatus.OK,
771+
highLevelClient().bulk(
772+
new BulkRequest()
773+
.add(new IndexRequest(sourceIndex, "type", "1")
774+
.source(Collections.singletonMap("foo", 1), XContentType.JSON))
775+
.add(new IndexRequest(sourceIndex, "type", "2")
776+
.source(Collections.singletonMap("foo", 2), XContentType.JSON))
777+
.setRefreshPolicy(RefreshPolicy.IMMEDIATE),
778+
RequestOptions.DEFAULT
779+
).status()
780+
);
781+
}
782+
{
783+
// test1: create one doc in dest
784+
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
785+
updateByQueryRequest.indices(sourceIndex);
786+
updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1").types("type"));
787+
updateByQueryRequest.setRefresh(true);
788+
BulkByScrollResponse bulkResponse =
789+
execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync);
790+
assertEquals(1, bulkResponse.getTotal());
791+
assertEquals(1, bulkResponse.getUpdated());
792+
assertEquals(0, bulkResponse.getNoops());
793+
assertEquals(0, bulkResponse.getVersionConflicts());
794+
assertEquals(1, bulkResponse.getBatches());
795+
assertTrue(bulkResponse.getTook().getMillis() > 0);
796+
assertEquals(1, bulkResponse.getBatches());
797+
assertEquals(0, bulkResponse.getBulkFailures().size());
798+
assertEquals(0, bulkResponse.getSearchFailures().size());
799+
}
800+
{
801+
// test2: update using script
802+
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
803+
updateByQueryRequest.indices(sourceIndex);
804+
updateByQueryRequest.setScript(new Script("if (ctx._source.foo == 2) ctx._source.foo++;"));
805+
updateByQueryRequest.setRefresh(true);
806+
BulkByScrollResponse bulkResponse =
807+
execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync);
808+
assertEquals(2, bulkResponse.getTotal());
809+
assertEquals(2, bulkResponse.getUpdated());
810+
assertEquals(0, bulkResponse.getDeleted());
811+
assertEquals(0, bulkResponse.getNoops());
812+
assertEquals(0, bulkResponse.getVersionConflicts());
813+
assertEquals(1, bulkResponse.getBatches());
814+
assertTrue(bulkResponse.getTook().getMillis() > 0);
815+
assertEquals(1, bulkResponse.getBatches());
816+
assertEquals(0, bulkResponse.getBulkFailures().size());
817+
assertEquals(0, bulkResponse.getSearchFailures().size());
818+
assertEquals(
819+
3,
820+
(int) (highLevelClient().get(new GetRequest(sourceIndex, "type", "2"), RequestOptions.DEFAULT)
821+
.getSourceAsMap().get("foo"))
822+
);
823+
}
824+
}
825+
759826
public void testBulkProcessorIntegration() throws IOException {
760827
int nbItems = randomIntBetween(10, 100);
761828
boolean[] errors = new boolean[nbItems];

client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@
130130
import org.elasticsearch.index.rankeval.RestRankEvalAction;
131131
import org.elasticsearch.index.reindex.ReindexRequest;
132132
import org.elasticsearch.index.reindex.RemoteInfo;
133+
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
133134
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
134135
import org.elasticsearch.protocol.xpack.migration.IndexUpgradeInfoRequest;
135136
import org.elasticsearch.protocol.xpack.watcher.DeleteWatchRequest;
@@ -138,6 +139,7 @@
138139
import org.elasticsearch.protocol.xpack.watcher.PutWatchRequest;
139140
import org.elasticsearch.repositories.fs.FsRepository;
140141
import org.elasticsearch.rest.action.search.RestSearchAction;
142+
import org.elasticsearch.script.Script;
141143
import org.elasticsearch.script.ScriptType;
142144
import org.elasticsearch.script.mustache.MultiSearchTemplateRequest;
143145
import org.elasticsearch.script.mustache.SearchTemplateRequest;
@@ -480,6 +482,60 @@ public void testReindex() throws IOException {
480482
assertToXContentBody(reindexRequest, request.getEntity());
481483
}
482484

485+
public void testUpdateByQuery() throws IOException {
486+
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
487+
updateByQueryRequest.indices(randomIndicesNames(1, 5));
488+
Map<String, String> expectedParams = new HashMap<>();
489+
if (randomBoolean()) {
490+
updateByQueryRequest.setDocTypes(generateRandomStringArray(5, 5, false, false));
491+
}
492+
if (randomBoolean()) {
493+
int batchSize = randomInt(100);
494+
updateByQueryRequest.setBatchSize(batchSize);
495+
expectedParams.put("scroll_size", Integer.toString(batchSize));
496+
}
497+
if (randomBoolean()) {
498+
updateByQueryRequest.setPipeline("my_pipeline");
499+
expectedParams.put("pipeline", "my_pipeline");
500+
}
501+
if (randomBoolean()) {
502+
updateByQueryRequest.setRouting("=cat");
503+
expectedParams.put("routing", "=cat");
504+
}
505+
if (randomBoolean()) {
506+
int size = randomIntBetween(100, 1000);
507+
updateByQueryRequest.setSize(size);
508+
expectedParams.put("size", Integer.toString(size));
509+
}
510+
if (randomBoolean()) {
511+
updateByQueryRequest.setAbortOnVersionConflict(false);
512+
expectedParams.put("conflicts", "proceed");
513+
}
514+
if (randomBoolean()) {
515+
String ts = randomTimeValue();
516+
updateByQueryRequest.setScroll(TimeValue.parseTimeValue(ts, "scroll"));
517+
expectedParams.put("scroll", ts);
518+
}
519+
if (randomBoolean()) {
520+
updateByQueryRequest.setQuery(new TermQueryBuilder("foo", "fooval"));
521+
}
522+
if (randomBoolean()) {
523+
updateByQueryRequest.setScript(new Script("ctx._source.last = \"lastname\""));
524+
}
525+
setRandomIndicesOptions(updateByQueryRequest::setIndicesOptions, updateByQueryRequest::indicesOptions, expectedParams);
526+
setRandomTimeout(updateByQueryRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams);
527+
Request request = RequestConverters.updateByQuery(updateByQueryRequest);
528+
StringJoiner joiner = new StringJoiner("/", "/", "");
529+
joiner.add(String.join(",", updateByQueryRequest.indices()));
530+
if (updateByQueryRequest.getDocTypes().length > 0)
531+
joiner.add(String.join(",", updateByQueryRequest.getDocTypes()));
532+
joiner.add("_update_by_query");
533+
assertEquals(joiner.toString(), request.getEndpoint());
534+
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
535+
assertEquals(expectedParams, request.getParameters());
536+
assertToXContentBody(updateByQueryRequest, request.getEntity());
537+
}
538+
483539
public void testPutMapping() throws IOException {
484540
PutMappingRequest putMappingRequest = new PutMappingRequest();
485541

client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -682,8 +682,7 @@ public void testApiNamingConventions() throws Exception {
682682
"render_search_template",
683683
"scripts_painless_execute",
684684
"tasks.get",
685-
"termvectors",
686-
"update_by_query"
685+
"termvectors"
687686
};
688687
//These API are not required for high-level client feature completeness
689688
String[] notRequiredApi = new String[] {

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.action.index.IndexRequest;
4040
import org.elasticsearch.action.index.IndexResponse;
4141
import org.elasticsearch.action.support.ActiveShardCount;
42+
import org.elasticsearch.action.support.IndicesOptions;
4243
import org.elasticsearch.action.support.WriteRequest;
4344
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
4445
import org.elasticsearch.action.support.replication.ReplicationResponse;
@@ -67,6 +68,7 @@
6768
import org.elasticsearch.index.reindex.ReindexRequest;
6869
import org.elasticsearch.index.reindex.RemoteInfo;
6970
import org.elasticsearch.index.reindex.ScrollableHitSource;
71+
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
7072
import org.elasticsearch.rest.RestStatus;
7173
import org.elasticsearch.script.Script;
7274
import org.elasticsearch.script.ScriptType;
@@ -909,6 +911,125 @@ public void onFailure(Exception e) {
909911
}
910912
}
911913

914+
public void testUpdateByQuery() throws Exception {
915+
RestHighLevelClient client = highLevelClient();
916+
{
917+
String mapping =
918+
"\"doc\": {\n" +
919+
" \"properties\": {\n" +
920+
" \"user\": {\n" +
921+
" \"type\": \"text\"\n" +
922+
" },\n" +
923+
" \"field1\": {\n" +
924+
" \"type\": \"integer\"\n" +
925+
" },\n" +
926+
" \"field2\": {\n" +
927+
" \"type\": \"integer\"\n" +
928+
" }\n" +
929+
" }\n" +
930+
" }";
931+
createIndex("source1", Settings.EMPTY, mapping);
932+
createIndex("source2", Settings.EMPTY, mapping);
933+
createPipeline("my_pipeline");
934+
}
935+
{
936+
// tag::update-by-query-request
937+
UpdateByQueryRequest request = new UpdateByQueryRequest("source1", "source2"); // <1>
938+
// end::update-by-query-request
939+
// tag::update-by-query-request-conflicts
940+
request.setConflicts("proceed"); // <1>
941+
// end::update-by-query-request-conflicts
942+
// tag::update-by-query-request-typeOrQuery
943+
request.setDocTypes("doc"); // <1>
944+
request.setQuery(new TermQueryBuilder("user", "kimchy")); // <2>
945+
// end::update-by-query-request-typeOrQuery
946+
// tag::update-by-query-request-size
947+
request.setSize(10); // <1>
948+
// end::update-by-query-request-size
949+
// tag::update-by-query-request-scrollSize
950+
request.setBatchSize(100); // <1>
951+
// end::update-by-query-request-scrollSize
952+
// tag::update-by-query-request-pipeline
953+
request.setPipeline("my_pipeline"); // <1>
954+
// end::update-by-query-request-pipeline
955+
// tag::update-by-query-request-script
956+
request.setScript(
957+
new Script(
958+
ScriptType.INLINE, "painless",
959+
"if (ctx._source.user == 'kimchy') {ctx._source.likes++;}",
960+
Collections.emptyMap())); // <1>
961+
// end::update-by-query-request-script
962+
// tag::update-by-query-request-timeout
963+
request.setTimeout(TimeValue.timeValueMinutes(2)); // <1>
964+
// end::update-by-query-request-timeout
965+
// tag::update-by-query-request-refresh
966+
request.setRefresh(true); // <1>
967+
// end::update-by-query-request-refresh
968+
// tag::update-by-query-request-slices
969+
request.setSlices(2); // <1>
970+
// end::update-by-query-request-slices
971+
// tag::update-by-query-request-scroll
972+
request.setScroll(TimeValue.timeValueMinutes(10)); // <1>
973+
// end::update-by-query-request-scroll
974+
// tag::update-by-query-request-routing
975+
request.setRouting("=cat"); // <1>
976+
// end::update-by-query-request-routing
977+
// tag::update-by-query-request-indicesOptions
978+
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); // <1>
979+
// end::update-by-query-request-indicesOptions
980+
981+
// tag::update-by-query-execute
982+
BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT);
983+
// end::update-by-query-execute
984+
assertSame(0, bulkResponse.getSearchFailures().size());
985+
assertSame(0, bulkResponse.getBulkFailures().size());
986+
// tag::update-by-query-response
987+
TimeValue timeTaken = bulkResponse.getTook(); // <1>
988+
boolean timedOut = bulkResponse.isTimedOut(); // <2>
989+
long totalDocs = bulkResponse.getTotal(); // <3>
990+
long updatedDocs = bulkResponse.getUpdated(); // <4>
991+
long deletedDocs = bulkResponse.getDeleted(); // <5>
992+
long batches = bulkResponse.getBatches(); // <6>
993+
long noops = bulkResponse.getNoops(); // <7>
994+
long versionConflicts = bulkResponse.getVersionConflicts(); // <8>
995+
long bulkRetries = bulkResponse.getBulkRetries(); // <9>
996+
long searchRetries = bulkResponse.getSearchRetries(); // <10>
997+
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled(); // <11>
998+
TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil(); // <12>
999+
List<ScrollableHitSource.SearchFailure> searchFailures = bulkResponse.getSearchFailures(); // <13>
1000+
List<BulkItemResponse.Failure> bulkFailures = bulkResponse.getBulkFailures(); // <14>
1001+
// end::update-by-query-response
1002+
}
1003+
{
1004+
UpdateByQueryRequest request = new UpdateByQueryRequest();
1005+
request.indices("source1");
1006+
1007+
// tag::update-by-query-execute-listener
1008+
ActionListener<BulkByScrollResponse> listener = new ActionListener<BulkByScrollResponse>() {
1009+
@Override
1010+
public void onResponse(BulkByScrollResponse bulkResponse) {
1011+
// <1>
1012+
}
1013+
1014+
@Override
1015+
public void onFailure(Exception e) {
1016+
// <2>
1017+
}
1018+
};
1019+
// end::update-by-query-execute-listener
1020+
1021+
// Replace the empty listener by a blocking listener in test
1022+
final CountDownLatch latch = new CountDownLatch(1);
1023+
listener = new LatchedActionListener<>(listener, latch);
1024+
1025+
// tag::update-by-query-execute-async
1026+
client.updateByQueryAsync(request, RequestOptions.DEFAULT, listener); // <1>
1027+
// end::update-by-query-execute-async
1028+
1029+
assertTrue(latch.await(30L, TimeUnit.SECONDS));
1030+
}
1031+
}
1032+
9121033
public void testGet() throws Exception {
9131034
RestHighLevelClient client = highLevelClient();
9141035
{

0 commit comments

Comments
 (0)