Skip to content

Remove types from BulkRequest #46983

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
94d7e14
Remove types from bulk request
romseygeek Sep 23, 2019
b6f94f7
Merge remote-tracking branch 'origin/master' into types-removal/bulk-…
romseygeek Sep 23, 2019
1f538a3
tests
romseygeek Sep 23, 2019
748d1f6
tests
romseygeek Sep 23, 2019
9ba9c59
tests
romseygeek Sep 23, 2019
e63d109
tests
romseygeek Sep 24, 2019
f03a740
compilation; remove type from BulkResponse.Failure
romseygeek Sep 24, 2019
e8d9158
tests
romseygeek Sep 25, 2019
daee751
bulk monitoring
romseygeek Sep 25, 2019
842ecfd
imports
romseygeek Sep 25, 2019
d47c63a
Merge remote-tracking branch 'origin/master' into types-removal/bulk-…
romseygeek Sep 25, 2019
45c6f5e
bulk tests
romseygeek Sep 25, 2019
100019d
tests
romseygeek Sep 25, 2019
fdabe48
monitoring again
romseygeek Sep 25, 2019
9779d9b
sake
romseygeek Sep 25, 2019
4345c6a
yaml test
romseygeek Sep 26, 2019
946c313
assertions
romseygeek Sep 26, 2019
5c2950f
yaml test
romseygeek Sep 26, 2019
ca03935
wtf
romseygeek Sep 26, 2019
7863b64
Merge branch 'master' into types-removal/bulk-request-action
romseygeek Sep 26, 2019
9baf0f5
relax assertion
romseygeek Sep 26, 2019
3b13ca2
Merge remote-tracking branch 'romseygeek/types-removal/bulk-request-a…
romseygeek Sep 26, 2019
bcb95b1
Merge remote-tracking branch 'origin/master' into types-removal/bulk-…
romseygeek Sep 27, 2019
72977c4
Re-enable assertion
romseygeek Sep 27, 2019
38aecb9
Revert "Re-enable assertion"
romseygeek Sep 30, 2019
f324d6b
Merge branch 'master' into types-removal/bulk-request-action
elasticmachine Sep 30, 2019
c9a0027
Merge remote-tracking branch 'origin/master' into types-removal/bulk-…
romseygeek Oct 2, 2019
e9c6fe5
Merge remote-tracking branch 'romseygeek/types-removal/bulk-request-a…
romseygeek Oct 2, 2019
4b4a475
Merge branch 'master' into types-removal/bulk-request-action
elasticmachine Oct 7, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.search.SearchHit;
import org.hamcrest.Matcher;

Expand Down Expand Up @@ -74,12 +72,6 @@ private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.List
bulkListener), listener);
}

private static BulkProcessor.Builder initBulkProcessorBuilderUsingTypes(BulkProcessor.Listener listener) {
return BulkProcessor.builder(
(request, bulkListener) -> highLevelClient().bulkAsync(request, expectWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE),
bulkListener), listener);
}

public void testThatBulkProcessorCountIsCorrect() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
Expand Down Expand Up @@ -170,7 +162,6 @@ public void testBulkProcessorConcurrentRequests() throws Exception {
for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
assertThat(bulkItemResponse.getFailureMessage(), bulkItemResponse.isFailed(), equalTo(false));
assertThat(bulkItemResponse.getIndex(), equalTo("test"));
assertThat(bulkItemResponse.getType(), equalTo("_doc"));
//with concurrent requests > 1 we can't rely on the order of the bulk requests
assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(numDocs)));
//we do want to check that we don't get duplicate ids back
Expand Down Expand Up @@ -269,7 +260,6 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception
Set<String> readOnlyIds = new HashSet<>();
for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
assertThat(bulkItemResponse.getIndex(), either(equalTo("test")).or(equalTo("test-ro")));
assertThat(bulkItemResponse.getType(), equalTo("_doc"));
if (bulkItemResponse.getIndex().equals("test")) {
assertThat(bulkItemResponse.isFailed(), equalTo(false));
//with concurrent requests > 1 we can't rely on the order of the bulk requests
Expand Down Expand Up @@ -298,7 +288,6 @@ public void testGlobalParametersAndSingleRequest() throws Exception {
// tag::bulk-processor-mix-parameters
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
.setGlobalIndex("tweets")
.setGlobalType("_doc")
.setGlobalRouting("routing")
.setGlobalPipeline("pipeline_id")
.build()) {
Expand Down Expand Up @@ -326,33 +315,29 @@ public void testGlobalParametersAndBulkProcessor() throws Exception {
createIndexWithMultipleShards("test");

createFieldAddingPipleine("pipeline_id", "fieldNameXYZ", "valueXYZ");
final String customType = "testType";
final String ignoredType = "ignoredType";

int numDocs = randomIntBetween(10, 10);
{
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
//Check that untyped document additions inherit the global type
String globalType = customType;
String localType = null;
try (BulkProcessor processor = initBulkProcessorBuilderUsingTypes(listener)
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
//let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setGlobalIndex("test")
.setGlobalType(globalType)
.setGlobalRouting("routing")
.setGlobalPipeline("pipeline_id")
.build()) {

indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id");
indexDocs(processor, numDocs, null, localType, "test", "pipeline_id");
latch.await();

assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs, globalType);
assertResponseItems(listener.bulkItems, numDocs);

Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));

Expand All @@ -361,65 +346,6 @@ public void testGlobalParametersAndBulkProcessor() throws Exception {
}

}
{
//Check that typed document additions don't inherit the global type
String globalType = ignoredType;
String localType = customType;
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
try (BulkProcessor processor = initBulkProcessorBuilderUsingTypes(listener)
//let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setGlobalIndex("test")
.setGlobalType(globalType)
.setGlobalRouting("routing")
.setGlobalPipeline("pipeline_id")
.build()) {
indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id");
latch.await();

assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs, localType);

Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));

assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
}
}
{
//Check that untyped document additions and untyped global inherit the established custom type
// (the custom document type introduced to the mapping by the earlier code in this test)
String globalType = null;
String localType = null;
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
//let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setGlobalIndex("test")
.setGlobalType(globalType)
.setGlobalRouting("routing")
.setGlobalPipeline("pipeline_id")
.build()) {
indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id");
latch.await();

assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs, MapperService.SINGLE_MAPPING_NAME);

Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));

assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
}
}
}

@SuppressWarnings("unchecked")
Expand All @@ -431,20 +357,15 @@ private Matcher<SearchHit>[] expectedIds(int numDocs) {
}

private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex, String localType,
String globalIndex, String globalType, String globalPipeline) throws Exception {
String globalIndex, String globalPipeline) throws Exception {
MultiGetRequest multiGetRequest = new MultiGetRequest();
for (int i = 1; i <= numDocs; i++) {
if (randomBoolean()) {
processor.add(new IndexRequest(localIndex, localType, Integer.toString(i))
.source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30)));
} else {
BytesArray data = bytesBulkRequest(localIndex, localType, i);
processor.add(data, globalIndex, globalType, globalPipeline, XContentType.JSON);

if (localType != null) {
// If the payload contains types, parsing it into a bulk request results in a warning.
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
}
processor.add(data, globalIndex, globalPipeline, XContentType.JSON);
}
multiGetRequest.add(localIndex, Integer.toString(i));
}
Expand Down Expand Up @@ -475,19 +396,14 @@ private static BytesArray bytesBulkRequest(String localIndex, String localType,
}

private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
return indexDocs(processor, numDocs, "test", null, null, null, null);
return indexDocs(processor, numDocs, "test", null, null, null);
}

private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses, int numDocs) {
assertResponseItems(bulkItemResponses, numDocs, MapperService.SINGLE_MAPPING_NAME);
}

private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses, int numDocs, String expectedType) {
assertThat(bulkItemResponses.size(), is(numDocs));
int i = 1;
for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
assertThat(bulkItemResponse.getIndex(), equalTo("test"));
assertThat(bulkItemResponse.getType(), equalTo(expectedType));
assertThat(bulkItemResponse.getId(), equalTo(Integer.toString(i++)));
assertThat("item " + i + " failed with cause: " + bulkItemResponse.getFailureMessage(),
bulkItemResponse.isFailed(), equalTo(false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void testMixPipelineOnRequestAndGlobal() throws IOException {
}

public void testGlobalIndex() throws IOException {
BulkRequest request = new BulkRequest("global_index", null);
BulkRequest request = new BulkRequest("global_index");
request.add(new IndexRequest().id("1")
.source(XContentType.JSON, "field", "bulk1"));
request.add(new IndexRequest().id("2")
Expand All @@ -120,7 +120,7 @@ public void testGlobalIndex() throws IOException {

@SuppressWarnings("unchecked")
public void testIndexGlobalAndPerRequest() throws IOException {
BulkRequest request = new BulkRequest("global_index", null);
BulkRequest request = new BulkRequest("global_index");
request.add(new IndexRequest("local_index").id("1")
.source(XContentType.JSON, "field", "bulk1"));
request.add(new IndexRequest().id("2") // will take global index
Expand Down Expand Up @@ -168,19 +168,6 @@ public void testMixLocalAndGlobalRouting() throws IOException {
assertThat(hits, containsInAnyOrder(hasId("1"), hasId("2")));
}

public void testGlobalIndexNoTypes() throws IOException {
BulkRequest request = new BulkRequest("global_index");
request.add(new IndexRequest().id("1")
.source(XContentType.JSON, "field", "bulk1"));
request.add(new IndexRequest().id("2")
.source(XContentType.JSON, "field", "bulk2"));

bulk(request);

Iterable<SearchHit> hits = searchAll("global_index");
assertThat(hits, everyItem(hasIndex("global_index")));
}

private BulkResponse bulk(BulkRequest request) throws IOException {
BulkResponse bulkResponse = execute(request, highLevelClient()::bulk, highLevelClient()::bulkAsync, RequestOptions.DEFAULT);
assertFalse(bulkResponse.hasFailures());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.rest.action.document.RestDeleteAction;
import org.elasticsearch.rest.action.document.RestIndexAction;
import org.elasticsearch.rest.action.document.RestUpdateAction;
Expand Down Expand Up @@ -401,20 +400,6 @@ public void testMultiGet() throws IOException {
}
}

public void testMultiGetWithTypes() throws IOException {
BulkRequest bulk = new BulkRequest();
bulk.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
bulk.add(new IndexRequest("index", "type", "id1")
.source("{\"field\":\"value1\"}", XContentType.JSON));
bulk.add(new IndexRequest("index", "type", "id2")
.source("{\"field\":\"value2\"}", XContentType.JSON));

highLevelClient().bulk(bulk, expectWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE));
MultiGetRequest multiGetRequest = new MultiGetRequest();
multiGetRequest.add("index", "id1");
multiGetRequest.add("index", "id2");
}

public void testIndex() throws IOException {
final XContentType xContentType = randomFrom(XContentType.values());
{
Expand Down Expand Up @@ -897,7 +882,6 @@ private void validateBulkResponses(int nbItems, boolean[] errors, BulkResponse b

assertEquals(i, bulkItemResponse.getItemId());
assertEquals("index", bulkItemResponse.getIndex());
assertEquals("_doc", bulkItemResponse.getType());
assertEquals(String.valueOf(i), bulkItemResponse.getId());

DocWriteRequest.OpType requestOpType = bulkRequest.requests().get(i).opType();
Expand Down
3 changes: 0 additions & 3 deletions docs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,6 @@ buildRestTests.setups['library'] = '''
- do:
bulk:
index: library
type: book
refresh: true
body: |
{"index":{"_id": "Leviathan Wakes"}}
Expand Down Expand Up @@ -923,7 +922,6 @@ buildRestTests.setups['farequote_data'] = buildRestTests.setups['farequote_index
- do:
bulk:
index: farequote
type: metric
refresh: true
body: |
{"index": {"_id":"1"}}
Expand Down Expand Up @@ -983,7 +981,6 @@ buildRestTests.setups['server_metrics_data'] = buildRestTests.setups['server_met
- do:
bulk:
index: server-metrics
type: metric
refresh: true
body: |
{"index": {"_id":"1177"}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ PUT /products
}
}

POST /products/_doc/_bulk?refresh
POST /products/_bulk?refresh
{"index":{"_id":0}}
{"genre": "rock", "product": "Product A"}
{"index":{"_id":1}}
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/sql/getting-started.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ an index with some data to experiment with:

[source,console]
--------------------------------------------------
PUT /library/book/_bulk?refresh
PUT /library/_bulk?refresh
{"index":{"_id": "Leviathan Wakes"}}
{"name": "Leviathan Wakes", "author": "James S.A. Corey", "release_date": "2011-06-02", "page_count": 561}
{"index":{"_id": "Hyperion"}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ setup:
bulk:
refresh: true
body:
- '{"index": {"_index": "test-0", "_type": "_doc"}}'
- '{"index": {"_index": "test-0"}}'
- '{"ip": "10.0.0.1", "integer": 38, "float": 12.5713, "name": "Ruth", "bool": true}'
- '{"index": {"_index": "test-0", "_type": "_doc"}}'
- '{"index": {"_index": "test-0"}}'
- '{"ip": "10.0.0.2", "integer": 42, "float": 15.3393, "name": "Jackie", "surname": "Bowling", "bool": false}'
- '{"index": {"_index": "test-1", "_type": "_doc"}}'
- '{"index": {"_index": "test-1"}}'
- '{"ip": "10.0.0.3", "integer": 29, "float": 19.0517, "name": "Stephanie", "bool": true}'
- '{"index": {"_index": "test-1", "_type": "_doc"}}'
- '{"index": {"_index": "test-1"}}'
- '{"ip": "10.0.0.4", "integer": 19, "float": 19.3717, "surname": "Hamilton", "bool": true}'
- '{"index": {"_index": "test-2", "_type": "_doc"}}'
- '{"index": {"_index": "test-2"}}'
- '{"ip": "10.0.0.5", "integer": 0, "float": 17.3349, "name": "Natalie", "bool": false}'

---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ public void testBulkResponseSetsLotsOfStatus() throws Exception {
if (rarely()) {
versionConflicts++;
responses[i] = new BulkItemResponse(i, randomFrom(DocWriteRequest.OpType.values()),
new Failure(shardId.getIndexName(), "type", "id" + i,
new Failure(shardId.getIndexName(), "id" + i,
new VersionConflictEngineException(shardId, "id", "test")));
continue;
}
Expand Down Expand Up @@ -399,7 +399,7 @@ public void testSearchTimeoutsAbortRequest() throws Exception {
* Mimicks bulk indexing failures.
*/
public void testBulkFailuresAbortRequest() throws Exception {
Failure failure = new Failure("index", "type", "id", new RuntimeException("test"));
Failure failure = new Failure("index", "id", new RuntimeException("test"));
DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction();
BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[]
{new BulkItemResponse(0, DocWriteRequest.OpType.CREATE, failure)}, randomLong());
Expand Down Expand Up @@ -902,7 +902,7 @@ void doExecute(ActionType<Response> action, Request request, ActionListener<Resp
}
if (i == toReject) {
responses[i] = new BulkItemResponse(i, item.opType(),
new Failure(response.getIndex(), response.getType(), response.getId(), new EsRejectedExecutionException()));
new Failure(response.getIndex(), response.getId(), new EsRejectedExecutionException()));
} else {
responses[i] = new BulkItemResponse(i, item.opType(), response);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testMergeConstructor() {
BulkByScrollTask.Status status = new BulkByScrollTask.Status(i, 0, 0, 0, 0, 0, 0, 0, 0, 0, timeValueMillis(0), 0f,
thisReasonCancelled, timeValueMillis(0));
List<BulkItemResponse.Failure> bulkFailures = frequently() ? emptyList()
: IntStream.range(0, between(1, 3)).mapToObj(j -> new BulkItemResponse.Failure("idx", "type", "id", new Exception()))
: IntStream.range(0, between(1, 3)).mapToObj(j -> new BulkItemResponse.Failure("idx", "id", new Exception()))
.collect(Collectors.toList());
allBulkFailures.addAll(bulkFailures);
List<SearchFailure> searchFailures = frequently() ? emptyList()
Expand Down
Loading