Skip to content

Commit

Permalink
Fix bulk upsert ignores the default_pipeline and final_pipeline when …
Browse files Browse the repository at this point in the history
…the auto-created index matches the index template (opensearch-project#12891)

* Fix bulk upsert ignores the default_pipeline and final_pipeline when auto-created index matches with the index template

Signed-off-by: Gao Binlong <gbinlong@amazon.com>

* Modify changelog & comment

Signed-off-by: Gao Binlong <gbinlong@amazon.com>

* Use new approach

Signed-off-by: Gao Binlong <gbinlong@amazon.com>

* Fix test failure

Signed-off-by: Gao Binlong <gbinlong@amazon.com>

---------

Signed-off-by: Gao Binlong <gbinlong@amazon.com>
  • Loading branch information
gaobinlong authored Jul 16, 2024
1 parent 54af34e commit 8ae728c
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Refactoring FilterPath.parse by using an iterative approach ([#14200](https://github.com/opensearch-project/OpenSearch/pull/14200))
- Refactoring Grok.validatePatternBank by using an iterative approach ([#14206](https://github.com/opensearch-project/OpenSearch/pull/14206))
- Update help output for _cat ([#14722](https://github.com/opensearch-project/OpenSearch/pull/14722))
- Fix bulk upsert ignores the default_pipeline and final_pipeline when auto-created index matches the index template ([#12891](https://github.com/opensearch-project/OpenSearch/pull/12891))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ teardown:
ingest.delete_pipeline:
id: "pipeline2"
ignore: 404
- do:
indices.delete_index_template:
name: test_index_template_for_bulk
ignore: 404

---
"Test bulk request without default pipeline":
Expand Down Expand Up @@ -168,6 +172,40 @@ teardown:
id: test_id3
- match: { _source: {"f1": "v2", "f2": 47, "field1": "value1"}}

# related issue: https://github.com/opensearch-project/OpenSearch/issues/12888
---
"Test bulk upsert honors default_pipeline and final_pipeline when the auto-created index matches with the index template":
- skip:
version: " - 2.99.99"
reason: "fixed in 3.0.0"
- do:
indices.put_index_template:
name: test_for_bulk_upsert_index_template
body:
index_patterns: test_bulk_upsert_*
template:
settings:
number_of_shards: 1
number_of_replicas: 0
default_pipeline: pipeline1
final_pipeline: pipeline2

- do:
bulk:
refresh: true
body:
- '{"update": {"_index": "test_bulk_upsert_index", "_id": "test_id3"}}'
- '{"upsert": {"f1": "v2", "f2": 47}, "doc": {"x": 1}}'

- match: { errors: false }
- match: { items.0.update.result: created }

- do:
get:
index: test_bulk_upsert_index
id: test_id3
- match: { _source: {"f1": "v2", "f2": 47, "field1": "value1", "field2": "value2"}}

---
"Test bulk API with batch enabled happy case":
- skip:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ public IndexRequest doc() {

private IndexRequest safeDoc() {
if (doc == null) {
doc = new IndexRequest();
doc = new IndexRequest(index);
}
return doc;
}
Expand Down Expand Up @@ -803,7 +803,7 @@ public IndexRequest upsertRequest() {

private IndexRequest safeUpsertRequest() {
if (upsertRequest == null) {
upsertRequest = new IndexRequest();
upsertRequest = new IndexRequest(index);
}
return upsertRequest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ public void testFromXContent() throws Exception {
assertThat(params, notNullValue());
assertThat(params.size(), equalTo(1));
assertThat(params.get("param1").toString(), equalTo("value1"));
assertThat(request.upsertRequest().index(), equalTo("test"));
Map<String, Object> upsertDoc = XContentHelper.convertToMap(
request.upsertRequest().source(),
true,
Expand Down Expand Up @@ -304,6 +305,7 @@ public void testFromXContent() throws Exception {
)
);
Map<String, Object> doc = request.doc().sourceAsMap();
assertThat(request.doc().index(), equalTo("test"));
assertThat(doc.get("field1").toString(), equalTo("value1"));
assertThat(((Map<String, Object>) doc.get("compound")).get("field2").toString(), equalTo("value2"));
}
Expand Down Expand Up @@ -662,7 +664,7 @@ public void testToString() throws IOException {
request.toString(),
equalTo(
"update {[test][1], doc_as_upsert[false], "
+ "doc[index {[null][null], source[{\"body\":\"bar\"}]}], scripted_upsert[false], detect_noop[true]}"
+ "doc[index {[test][null], source[{\"body\":\"bar\"}]}], scripted_upsert[false], detect_noop[true]}"
)
);
}
Expand Down
14 changes: 14 additions & 0 deletions server/src/test/java/org/opensearch/ingest/IngestServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1605,6 +1605,13 @@ public void testResolveRequiredOrDefaultPipelineDefaultPipeline() {
assertThat(result, is(true));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo("default-pipeline"));

// index name matches with ITMD for bulk upsert
UpdateRequest updateRequest = new UpdateRequest("idx", "id1").upsert(emptyMap()).script(mockScript("1"));
result = IngestService.resolvePipelines(updateRequest, TransportBulkAction.getIndexWriteRequest(updateRequest), metadata);
assertThat(result, is(true));
assertThat(updateRequest.upsertRequest().isPipelineResolved(), is(true));
assertThat(updateRequest.upsertRequest().getPipeline(), equalTo("default-pipeline"));
}

public void testResolveFinalPipeline() {
Expand Down Expand Up @@ -1642,6 +1649,13 @@ public void testResolveFinalPipeline() {
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo("_none"));
assertThat(indexRequest.getFinalPipeline(), equalTo("final-pipeline"));

// index name matches with ITMD for bulk upsert:
UpdateRequest updateRequest = new UpdateRequest("idx", "id1").upsert(emptyMap()).script(mockScript("1"));
result = IngestService.resolvePipelines(updateRequest, TransportBulkAction.getIndexWriteRequest(updateRequest), metadata);
assertThat(result, is(true));
assertThat(updateRequest.upsertRequest().isPipelineResolved(), is(true));
assertThat(updateRequest.upsertRequest().getFinalPipeline(), equalTo("final-pipeline"));
}

public void testResolveRequestOrDefaultPipelineAndFinalPipeline() {
Expand Down

0 comments on commit 8ae728c

Please sign in to comment.