Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add composite directory factory ([#17988](https://github.com/opensearch-project/OpenSearch/pull/17988))
- [Rule based auto-tagging] Add refresh based synchronization service for `Rule`s ([#18128](https://github.com/opensearch-project/OpenSearch/pull/18128))
- Add pull-based ingestion error metrics and make internal queue size configurable ([#18088](https://github.com/opensearch-project/OpenSearch/pull/18088))
- [Derive Source] Adding support for derive source feature and implementing it for various type of field mappers ([#17759](https://github.com/opensearch-project/OpenSearch/pull/17759))
- [Derive Source] Adding integration of derived source feature across diff paths ([#18054](https://github.com/opensearch-project/OpenSearch/pull/18054))
- Adding support for derive source feature and implementing it for various type of field mappers ([#17759](https://github.com/opensearch-project/OpenSearch/pull/17759))
- [Security Manager Replacement] Enhance Java Agent to intercept newByteChannel ([#17989](https://github.com/opensearch-project/OpenSearch/pull/17989))
- Enabled Async Shard Batch Fetch by default ([#18139](https://github.com/opensearch-project/OpenSearch/pull/18139))
- Allow to get the search request from the QueryCoordinatorContext ([#17818](https://github.com/opensearch-project/OpenSearch/pull/17818))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,7 @@

package org.opensearch.index.reindex;

import org.opensearch.action.bulk.BulkRequestBuilder;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.search.SearchHit;
import org.opensearch.search.sort.SortOrder;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -48,9 +41,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.index.query.QueryBuilders.termQuery;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -186,301 +177,4 @@ public void testMissingSources() {
assertThat(response, matcher().created(0).slices(hasSize(0)));
}

public void testReindexWithDerivedSource() throws Exception {
// Create source index with derived source setting enabled
String sourceIndexMapping = """
{
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 0,
"derived_source": {
"enabled": true
}
}
},
"mappings": {
"_doc": {
"properties": {
"foo": {
"type": "keyword",
"store": true
},
"bar": {
"type": "integer",
"store": true
}
}
}
}
}""";

// Create indices
assertAcked(prepareCreate("source_index").setSource(sourceIndexMapping, XContentType.JSON));
assertAcked(prepareCreate("dest_index").setSource(sourceIndexMapping, XContentType.JSON));
ensureGreen();

// Index some documents
int numDocs = randomIntBetween(5, 20);
List<IndexRequestBuilder> docs = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
docs.add(client().prepareIndex("source_index").setId(Integer.toString(i)).setSource("foo", "value_" + i, "bar", i));
}
indexRandom(true, docs);

// Test 1: Basic reindex
ReindexRequestBuilder copy = reindex().source("source_index").destination("dest_index").refresh(true);

BulkByScrollResponse response = copy.get();
assertThat(response, matcher().created(numDocs));
long expectedCount = client().prepareSearch("dest_index").setQuery(matchAllQuery()).get().getHits().getTotalHits().value();
assertEquals(numDocs, expectedCount);

// Test 2: Reindex with query filter
String destIndexFiltered = "dest_index_filtered";
assertAcked(prepareCreate(destIndexFiltered).setSource(sourceIndexMapping, XContentType.JSON));

copy = reindex().source("source_index").destination(destIndexFiltered).filter(termQuery("bar", 1)).refresh(true);

response = copy.get();
expectedCount = client().prepareSearch("source_index").setQuery(termQuery("bar", 1)).get().getHits().getTotalHits().value();
assertThat(response, matcher().created(expectedCount));

// Test 3: Reindex with slices
String destIndexSliced = "dest_index_sliced";
assertAcked(prepareCreate(destIndexSliced).setSource(sourceIndexMapping, XContentType.JSON));

int slices = randomSlices();
int expectedSlices = expectedSliceStatuses(slices, "source_index");

copy = reindex().source("source_index").destination(destIndexSliced).setSlices(slices).refresh(true);

response = copy.get();
assertThat(response, matcher().created(numDocs).slices(hasSize(expectedSlices)));

// Test 4: Reindex with maxDocs
String destIndexMaxDocs = "dest_index_maxdocs";
assertAcked(prepareCreate(destIndexMaxDocs).setSource(sourceIndexMapping, XContentType.JSON));

int maxDocs = numDocs / 2;
copy = reindex().source("source_index").destination(destIndexMaxDocs).maxDocs(maxDocs).refresh(true);

response = copy.get();
assertThat(response, matcher().created(maxDocs));
expectedCount = client().prepareSearch(destIndexMaxDocs).setQuery(matchAllQuery()).get().getHits().getTotalHits().value();
assertEquals(maxDocs, expectedCount);

// Test 5: Multiple source indices
String sourceIndex2 = "source_index_2";
assertAcked(prepareCreate(sourceIndex2).setSource(sourceIndexMapping, XContentType.JSON));

int numDocs2 = randomIntBetween(5, 20);
List<IndexRequestBuilder> docs2 = new ArrayList<>();
for (int i = 0; i < numDocs2; i++) {
docs2.add(
client().prepareIndex(sourceIndex2).setId(Integer.toString(i + numDocs)).setSource("foo", "value2_" + i, "bar", i + numDocs)
);
}
indexRandom(true, docs2);

String destIndexMulti = "dest_index_multi";
assertAcked(prepareCreate(destIndexMulti).setSource(sourceIndexMapping, XContentType.JSON));

copy = reindex().source("source_index", "source_index_2").destination(destIndexMulti).refresh(true);

response = copy.get();
assertThat(response, matcher().created(numDocs + numDocs2));
expectedCount = client().prepareSearch(destIndexMulti).setQuery(matchAllQuery()).get().getHits().getTotalHits().value();
assertEquals(numDocs + numDocs2, expectedCount);
}

public void testReindexFromDerivedSourceToNormalIndex() throws Exception {
// Create source index with derived source enabled
String sourceMapping = """
{
"properties": {
"text_field": {
"type": "text",
"store": true
},
"keyword_field": {
"type": "keyword"
},
"numeric_field": {
"type": "long",
"doc_values": true
},
"date_field": {
"type": "date",
"store": true
}
}
}""";

// Create destination index with normal settings
String destMapping = """
{
"properties": {
"text_field": {
"type": "text"
},
"keyword_field": {
"type": "keyword"
},
"numeric_field": {
"type": "long"
},
"date_field": {
"type": "date"
}
}
}""";

// Create source index
assertAcked(
prepareCreate("source_index").setSettings(
Settings.builder().put("index.number_of_shards", 2).put("index.derived_source.enabled", true)
).setMapping(sourceMapping)
);

// Create destination index
assertAcked(prepareCreate("dest_index").setMapping(destMapping));

// Index test documents
int numDocs = randomIntBetween(100, 200);
final List<IndexRequestBuilder> docs = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
docs.add(
client().prepareIndex("source_index")
.setId(Integer.toString(i))
.setSource(
"text_field",
"text value " + i,
"keyword_field",
"key_" + i,
"numeric_field",
i,
"date_field",
System.currentTimeMillis()
)
);
}
indexRandom(true, docs);
refresh("source_index");

// Test 1: Basic reindex without slices
ReindexRequestBuilder reindex = reindex().source("source_index").destination("dest_index").refresh(true);
BulkByScrollResponse response = reindex.get();
assertThat(response, matcher().created(numDocs));
verifyReindexedContent("dest_index", numDocs);

// Test 2: Reindex with query filter
String destFilteredIndex = "dest_filtered_index";
assertAcked(prepareCreate(destFilteredIndex).setMapping(destMapping));
reindex = reindex().source("source_index").destination(destFilteredIndex).filter(termQuery("keyword_field", "key_1")).refresh(true);
response = reindex.get();
assertThat(response, matcher().created(1));
verifyReindexedContent(destFilteredIndex, 1);

// Test 3: Reindex with slices
String destSlicedIndex = "dest_sliced_index";
assertAcked(prepareCreate(destSlicedIndex).setMapping(destMapping));
int slices = randomSlices();
int expectedSlices = expectedSliceStatuses(slices, "source_index");

reindex = reindex().source("source_index").destination(destSlicedIndex).setSlices(slices).refresh(true);
response = reindex.get();
assertThat(response, matcher().created(numDocs).slices(hasSize(expectedSlices)));
verifyReindexedContent(destSlicedIndex, numDocs);

// Test 4: Reindex with field transformation
String destTransformedIndex = "dest_transformed_index";
String transformedMapping = """
{
"properties": {
"new_text_field": {
"type": "text"
},
"new_keyword_field": {
"type": "keyword"
},
"modified_numeric": {
"type": "long"
},
"date_field": {
"type": "date"
}
}
}""";
assertAcked(prepareCreate(destTransformedIndex).setMapping(transformedMapping));

// First reindex the documents
reindex = reindex().source("source_index").destination(destTransformedIndex).refresh(true);
response = reindex.get();
assertThat(response, matcher().created(numDocs));

// Then transform using bulk update
BulkRequestBuilder bulkRequest = client().prepareBulk();
SearchResponse searchResponse = client().prepareSearch(destTransformedIndex).setQuery(matchAllQuery()).setSize(numDocs).get();

for (SearchHit hit : searchResponse.getHits()) {
Map<String, Object> source = hit.getSourceAsMap();
Map<String, Object> newSource = new HashMap<>();

// Transform fields
newSource.put("new_text_field", source.get("text_field"));
newSource.put("new_keyword_field", source.get("keyword_field"));
newSource.put("modified_numeric", ((Number) source.get("numeric_field")).longValue() + 1000);
newSource.put("date_field", source.get("date_field"));

bulkRequest.add(client().prepareIndex(destTransformedIndex).setId(hit.getId()).setSource(newSource));
}

BulkResponse bulkResponse = bulkRequest.get();
assertFalse(bulkResponse.hasFailures());
refresh(destTransformedIndex);
verifyTransformedContent(destTransformedIndex, numDocs);
}

private void verifyReindexedContent(String indexName, int expectedCount) {
refresh(indexName);
SearchResponse searchResponse = client().prepareSearch(indexName)
.setQuery(matchAllQuery())
.setSize(expectedCount)
.addSort("numeric_field", SortOrder.ASC)
.get();

assertHitCount(searchResponse, expectedCount);

for (SearchHit hit : searchResponse.getHits()) {
Map<String, Object> source = hit.getSourceAsMap();
int id = Integer.parseInt(hit.getId());

assertEquals("text value " + id, source.get("text_field"));
assertEquals("key_" + id, source.get("keyword_field"));
assertEquals(id, ((Number) source.get("numeric_field")).intValue());
assertNotNull(source.get("date_field"));
}
}

private void verifyTransformedContent(String indexName, int expectedCount) {
refresh(indexName);
SearchResponse searchResponse = client().prepareSearch(indexName)
.setQuery(matchAllQuery())
.setSize(expectedCount)
.addSort("modified_numeric", SortOrder.ASC)
.get();

assertHitCount(searchResponse, expectedCount);

for (SearchHit hit : searchResponse.getHits()) {
Map<String, Object> source = hit.getSourceAsMap();
int id = Integer.parseInt(hit.getId());

assertEquals("text value " + id, source.get("new_text_field"));
assertEquals("key_" + id, source.get("new_keyword_field"));
assertEquals(id + 1000, ((Number) source.get("modified_numeric")).longValue());
assertNotNull(source.get("date_field"));
}
}
}
Loading
Loading