Skip to content

Removes types from bulk API #42194

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ public RestNoopBulkAction(Settings settings, RestController controller) {
controller.registerHandler(PUT, "/_noop_bulk", this);
controller.registerHandler(POST, "/{index}/_noop_bulk", this);
controller.registerHandler(PUT, "/{index}/_noop_bulk", this);
controller.registerHandler(POST, "/{index}/{type}/_noop_bulk", this);
controller.registerHandler(PUT, "/{index}/{type}/_noop_bulk", this);
}

@Override
Expand All @@ -65,7 +63,6 @@ public String getName() {
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
BulkRequest bulkRequest = Requests.bulkRequest();
String defaultIndex = request.param("index");
String defaultType = request.param("type");
String defaultRouting = request.param("routing");
String defaultPipeline = request.param("pipeline");

Expand All @@ -75,7 +72,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
}
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh"));
bulkRequest.add(request.requiredContent(), defaultIndex, defaultType, defaultRouting,
bulkRequest.add(request.requiredContent(), defaultIndex, defaultRouting,
null, defaultPipeline, true, request.getXContentType());

// short circuit the call to the transport layer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.client;

import com.carrotsearch.randomizedtesting.generators.RandomPicks;

import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
Expand All @@ -37,10 +38,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.search.SearchHit;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -51,16 +49,11 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.fieldFromSource;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasId;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasIndex;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasProperty;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasType;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
Expand All @@ -77,12 +70,6 @@ private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.List
bulkListener), listener);
}

private static BulkProcessor.Builder initBulkProcessorBuilderUsingTypes(BulkProcessor.Listener listener) {
return BulkProcessor.builder(
(request, bulkListener) -> highLevelClient().bulkAsync(request, expectWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE),
bulkListener), listener);
}

public void testThatBulkProcessorCountIsCorrect() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
Expand Down Expand Up @@ -291,7 +278,6 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception
assertMultiGetResponse(highLevelClient().mget(multiGetRequest, RequestOptions.DEFAULT), testDocs);
}

@SuppressWarnings("unchecked")
public void testGlobalParametersAndSingleRequest() throws Exception {
createIndexWithMultipleShards("test");

Expand All @@ -302,7 +288,6 @@ public void testGlobalParametersAndSingleRequest() throws Exception {
// tag::bulk-processor-mix-parameters
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
.setGlobalIndex("tweets")
.setGlobalType("_doc")
.setGlobalRouting("routing")
.setGlobalPipeline("pipeline_id")
.build()) {
Expand All @@ -326,150 +311,29 @@ public void testGlobalParametersAndSingleRequest() throws Exception {
assertThat(blogs, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
}

@SuppressWarnings("unchecked")
public void testGlobalParametersAndBulkProcessor() throws Exception {
createIndexWithMultipleShards("test");

createFieldAddingPipleine("pipeline_id", "fieldNameXYZ", "valueXYZ");
final String customType = "testType";
final String ignoredType = "ignoredType";

int numDocs = randomIntBetween(10, 10);
{
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
//Check that untyped document additions inherit the global type
String globalType = customType;
String localType = null;
try (BulkProcessor processor = initBulkProcessorBuilderUsingTypes(listener)
//let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setGlobalIndex("test")
.setGlobalType(globalType)
.setGlobalRouting("routing")
.setGlobalPipeline("pipeline_id")
.build()) {

indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id");
latch.await();

assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs, globalType);

Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));

assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType(globalType))));
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
}

}
{
//Check that typed document additions don't inherit the global type
String globalType = ignoredType;
String localType = customType;
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
try (BulkProcessor processor = initBulkProcessorBuilderUsingTypes(listener)
//let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setGlobalIndex("test")
.setGlobalType(globalType)
.setGlobalRouting("routing")
.setGlobalPipeline("pipeline_id")
.build()) {
indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id");
latch.await();

assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs, localType);

Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));

assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType(localType))));
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
}
}
{
//Check that untyped document additions and untyped global inherit the established custom type
// (the custom document type introduced to the mapping by the earlier code in this test)
String globalType = null;
String localType = null;
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
//let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setGlobalIndex("test")
.setGlobalType(globalType)
.setGlobalRouting("routing")
.setGlobalPipeline("pipeline_id")
.build()) {
indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id");
latch.await();

assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs, MapperService.SINGLE_MAPPING_NAME);

Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));

assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType(customType))));
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
}
}
}

@SuppressWarnings("unchecked")
private Matcher<SearchHit>[] expectedIds(int numDocs) {
return IntStream.rangeClosed(1, numDocs)
.boxed()
.map(n -> hasId(n.toString()))
.<Matcher<SearchHit>>toArray(Matcher[]::new);
}

private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex, String localType,
String globalIndex, String globalType, String globalPipeline) throws Exception {
private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex, String globalIndex, String globalPipeline)
throws Exception {
MultiGetRequest multiGetRequest = new MultiGetRequest();
for (int i = 1; i <= numDocs; i++) {
if (randomBoolean()) {
processor.add(new IndexRequest(localIndex, localType, Integer.toString(i))
processor.add(new IndexRequest(localIndex).id(Integer.toString(i))
.source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30)));
} else {
BytesArray data = bytesBulkRequest(localIndex, localType, i);
processor.add(data, globalIndex, globalType, globalPipeline, XContentType.JSON);

if (localType != null) {
// If the payload contains types, parsing it into a bulk request results in a warning.
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
}
BytesArray data = bytesBulkRequest(localIndex, i);
processor.add(data, globalIndex, globalPipeline, XContentType.JSON);
}
multiGetRequest.add(localIndex, Integer.toString(i));
}
return multiGetRequest;
}

private static BytesArray bytesBulkRequest(String localIndex, String localType, int id) throws IOException {
private static BytesArray bytesBulkRequest(String localIndex, int id) throws IOException {
XContentBuilder action = jsonBuilder().startObject().startObject("index");

if (localIndex != null) {
action.field("_index", localIndex);
}

if (localType != null) {
action.field("_type", localType);
}

action.field("_id", Integer.toString(id));
action.endObject().endObject();

Expand All @@ -483,7 +347,7 @@ private static BytesArray bytesBulkRequest(String localIndex, String localType,
}

private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
return indexDocs(processor, numDocs, "test", null, null, null, null);
return indexDocs(processor, numDocs, "test", null, null);
}

private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses, int numDocs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.search.SearchHit;

import java.io.IOException;
Expand All @@ -33,7 +32,6 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasId;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasIndex;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasProperty;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasType;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
Expand All @@ -44,7 +42,6 @@

public class BulkRequestWithGlobalParametersIT extends ESRestHighLevelClientTestCase {

@SuppressWarnings("unchecked")
public void testGlobalPipelineOnBulkRequest() throws IOException {
createFieldAddingPipleine("xyz", "fieldNameXYZ", "valueXYZ");

Expand Down Expand Up @@ -83,7 +80,6 @@ public void testPipelineOnRequestOverridesGlobalPipeline() throws IOException {
assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldXYZ"), nullValue())));
}

@SuppressWarnings("unchecked")
public void testMixPipelineOnRequestAndGlobal() throws IOException {
createFieldAddingPipleine("globalId", "fieldXYZ", "valueXYZ");
createFieldAddingPipleine("perIndexId", "someNewField", "someValue");
Expand All @@ -110,7 +106,7 @@ public void testMixPipelineOnRequestAndGlobal() throws IOException {
}

public void testGlobalIndex() throws IOException {
BulkRequest request = new BulkRequest("global_index", null);
BulkRequest request = new BulkRequest("global_index");
request.add(new IndexRequest().id("1")
.source(XContentType.JSON, "field", "bulk1"));
request.add(new IndexRequest().id("2")
Expand All @@ -122,9 +118,8 @@ public void testGlobalIndex() throws IOException {
assertThat(hits, everyItem(hasIndex("global_index")));
}

@SuppressWarnings("unchecked")
public void testIndexGlobalAndPerRequest() throws IOException {
BulkRequest request = new BulkRequest("global_index", null);
BulkRequest request = new BulkRequest("global_index");
request.add(new IndexRequest("local_index").id("1")
.source(XContentType.JSON, "field", "bulk1"));
request.add(new IndexRequest().id("2") // will take global index
Expand All @@ -140,38 +135,20 @@ public void testIndexGlobalAndPerRequest() throws IOException {
.and(hasIndex("global_index"))));
}

public void testGlobalType() throws IOException {
BulkRequest request = new BulkRequest(null, "global_type");
request.add(new IndexRequest("index").id("1")
.source(XContentType.JSON, "field", "bulk1"));
request.add(new IndexRequest("index").id("2")
.source(XContentType.JSON, "field", "bulk2"));

bulkWithTypes(request);

Iterable<SearchHit> hits = searchAll("index");
assertThat(hits, everyItem(hasType("global_type")));
}

@SuppressWarnings("unchecked")
public void testTypeGlobalAndPerRequest() throws IOException {
BulkRequest request = new BulkRequest(null, "global_type");
request.add(new IndexRequest("index1", "local_type", "1")
BulkRequest request = new BulkRequest(null);
request.add(new IndexRequest("index1").id("1")
.source(XContentType.JSON, "field", "bulk1"));
request.add(new IndexRequest("index2").id("2") // will take global type
request.add(new IndexRequest("index2").id("2")
.source(XContentType.JSON, "field", "bulk2"));

bulkWithTypes(request);
bulk(request);

Iterable<SearchHit> hits = searchAll("index1", "index2");
assertThat(hits, containsInAnyOrder(
both(hasId("1"))
.and(hasType("local_type")),
both(hasId("2"))
.and(hasType("global_type"))));
hasId("1"), hasId("2")));
}

@SuppressWarnings("unchecked")
public void testGlobalRouting() throws IOException {
createIndexWithMultipleShards("index");
BulkRequest request = new BulkRequest(null);
Expand All @@ -189,7 +166,6 @@ public void testGlobalRouting() throws IOException {
assertThat(hits, containsInAnyOrder(hasId("1"), hasId("2")));
}

@SuppressWarnings("unchecked")
public void testMixLocalAndGlobalRouting() throws IOException {
BulkRequest request = new BulkRequest(null);
request.routing("globalRouting");
Expand Down Expand Up @@ -217,13 +193,6 @@ public void testGlobalIndexNoTypes() throws IOException {
Iterable<SearchHit> hits = searchAll("global_index");
assertThat(hits, everyItem(hasIndex("global_index")));
}

private BulkResponse bulkWithTypes(BulkRequest request) throws IOException {
BulkResponse bulkResponse = execute(request, highLevelClient()::bulk, highLevelClient()::bulkAsync,
expectWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE));
assertFalse(bulkResponse.hasFailures());
return bulkResponse;
}

private BulkResponse bulk(BulkRequest request) throws IOException {
BulkResponse bulkResponse = execute(request, highLevelClient()::bulk, highLevelClient()::bulkAsync, RequestOptions.DEFAULT);
Expand Down
Loading