|
32 | 32 |
|
33 | 33 | package org.opensearch.index.reindex; |
34 | 34 |
|
35 | | -import org.opensearch.action.bulk.BulkRequestBuilder; |
36 | | -import org.opensearch.action.bulk.BulkResponse; |
37 | 35 | import org.opensearch.action.index.IndexRequestBuilder; |
38 | | -import org.opensearch.action.search.SearchResponse; |
39 | | -import org.opensearch.common.settings.Settings; |
40 | | -import org.opensearch.common.xcontent.XContentType; |
41 | | -import org.opensearch.search.SearchHit; |
42 | | -import org.opensearch.search.sort.SortOrder; |
43 | 36 |
|
44 | 37 | import java.util.ArrayList; |
45 | 38 | import java.util.Collection; |
|
48 | 41 | import java.util.Map; |
49 | 42 | import java.util.stream.Collectors; |
50 | 43 |
|
51 | | -import static org.opensearch.index.query.QueryBuilders.matchAllQuery; |
52 | 44 | import static org.opensearch.index.query.QueryBuilders.termQuery; |
53 | | -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; |
54 | 45 | import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; |
55 | 46 | import static org.hamcrest.Matchers.greaterThanOrEqualTo; |
56 | 47 | import static org.hamcrest.Matchers.hasSize; |
@@ -186,301 +177,4 @@ public void testMissingSources() { |
186 | 177 | assertThat(response, matcher().created(0).slices(hasSize(0))); |
187 | 178 | } |
188 | 179 |
|
189 | | - public void testReindexWithDerivedSource() throws Exception { |
190 | | - // Create source index with derived source setting enabled |
191 | | - String sourceIndexMapping = """ |
192 | | - { |
193 | | - "settings": { |
194 | | - "index": { |
195 | | - "number_of_shards": 1, |
196 | | - "number_of_replicas": 0, |
197 | | - "derived_source": { |
198 | | - "enabled": true |
199 | | - } |
200 | | - } |
201 | | - }, |
202 | | - "mappings": { |
203 | | - "_doc": { |
204 | | - "properties": { |
205 | | - "foo": { |
206 | | - "type": "keyword", |
207 | | - "store": true |
208 | | - }, |
209 | | - "bar": { |
210 | | - "type": "integer", |
211 | | - "store": true |
212 | | - } |
213 | | - } |
214 | | - } |
215 | | - } |
216 | | - }"""; |
217 | | - |
218 | | - // Create indices |
219 | | - assertAcked(prepareCreate("source_index").setSource(sourceIndexMapping, XContentType.JSON)); |
220 | | - assertAcked(prepareCreate("dest_index").setSource(sourceIndexMapping, XContentType.JSON)); |
221 | | - ensureGreen(); |
222 | | - |
223 | | - // Index some documents |
224 | | - int numDocs = randomIntBetween(5, 20); |
225 | | - List<IndexRequestBuilder> docs = new ArrayList<>(); |
226 | | - for (int i = 0; i < numDocs; i++) { |
227 | | - docs.add(client().prepareIndex("source_index").setId(Integer.toString(i)).setSource("foo", "value_" + i, "bar", i)); |
228 | | - } |
229 | | - indexRandom(true, docs); |
230 | | - |
231 | | - // Test 1: Basic reindex |
232 | | - ReindexRequestBuilder copy = reindex().source("source_index").destination("dest_index").refresh(true); |
233 | | - |
234 | | - BulkByScrollResponse response = copy.get(); |
235 | | - assertThat(response, matcher().created(numDocs)); |
236 | | - long expectedCount = client().prepareSearch("dest_index").setQuery(matchAllQuery()).get().getHits().getTotalHits().value(); |
237 | | - assertEquals(numDocs, expectedCount); |
238 | | - |
239 | | - // Test 2: Reindex with query filter |
240 | | - String destIndexFiltered = "dest_index_filtered"; |
241 | | - assertAcked(prepareCreate(destIndexFiltered).setSource(sourceIndexMapping, XContentType.JSON)); |
242 | | - |
243 | | - copy = reindex().source("source_index").destination(destIndexFiltered).filter(termQuery("bar", 1)).refresh(true); |
244 | | - |
245 | | - response = copy.get(); |
246 | | - expectedCount = client().prepareSearch("source_index").setQuery(termQuery("bar", 1)).get().getHits().getTotalHits().value(); |
247 | | - assertThat(response, matcher().created(expectedCount)); |
248 | | - |
249 | | - // Test 3: Reindex with slices |
250 | | - String destIndexSliced = "dest_index_sliced"; |
251 | | - assertAcked(prepareCreate(destIndexSliced).setSource(sourceIndexMapping, XContentType.JSON)); |
252 | | - |
253 | | - int slices = randomSlices(); |
254 | | - int expectedSlices = expectedSliceStatuses(slices, "source_index"); |
255 | | - |
256 | | - copy = reindex().source("source_index").destination(destIndexSliced).setSlices(slices).refresh(true); |
257 | | - |
258 | | - response = copy.get(); |
259 | | - assertThat(response, matcher().created(numDocs).slices(hasSize(expectedSlices))); |
260 | | - |
261 | | - // Test 4: Reindex with maxDocs |
262 | | - String destIndexMaxDocs = "dest_index_maxdocs"; |
263 | | - assertAcked(prepareCreate(destIndexMaxDocs).setSource(sourceIndexMapping, XContentType.JSON)); |
264 | | - |
265 | | - int maxDocs = numDocs / 2; |
266 | | - copy = reindex().source("source_index").destination(destIndexMaxDocs).maxDocs(maxDocs).refresh(true); |
267 | | - |
268 | | - response = copy.get(); |
269 | | - assertThat(response, matcher().created(maxDocs)); |
270 | | - expectedCount = client().prepareSearch(destIndexMaxDocs).setQuery(matchAllQuery()).get().getHits().getTotalHits().value(); |
271 | | - assertEquals(maxDocs, expectedCount); |
272 | | - |
273 | | - // Test 5: Multiple source indices |
274 | | - String sourceIndex2 = "source_index_2"; |
275 | | - assertAcked(prepareCreate(sourceIndex2).setSource(sourceIndexMapping, XContentType.JSON)); |
276 | | - |
277 | | - int numDocs2 = randomIntBetween(5, 20); |
278 | | - List<IndexRequestBuilder> docs2 = new ArrayList<>(); |
279 | | - for (int i = 0; i < numDocs2; i++) { |
280 | | - docs2.add( |
281 | | - client().prepareIndex(sourceIndex2).setId(Integer.toString(i + numDocs)).setSource("foo", "value2_" + i, "bar", i + numDocs) |
282 | | - ); |
283 | | - } |
284 | | - indexRandom(true, docs2); |
285 | | - |
286 | | - String destIndexMulti = "dest_index_multi"; |
287 | | - assertAcked(prepareCreate(destIndexMulti).setSource(sourceIndexMapping, XContentType.JSON)); |
288 | | - |
289 | | - copy = reindex().source("source_index", "source_index_2").destination(destIndexMulti).refresh(true); |
290 | | - |
291 | | - response = copy.get(); |
292 | | - assertThat(response, matcher().created(numDocs + numDocs2)); |
293 | | - expectedCount = client().prepareSearch(destIndexMulti).setQuery(matchAllQuery()).get().getHits().getTotalHits().value(); |
294 | | - assertEquals(numDocs + numDocs2, expectedCount); |
295 | | - } |
296 | | - |
297 | | - public void testReindexFromDerivedSourceToNormalIndex() throws Exception { |
298 | | - // Create source index with derived source enabled |
299 | | - String sourceMapping = """ |
300 | | - { |
301 | | - "properties": { |
302 | | - "text_field": { |
303 | | - "type": "text", |
304 | | - "store": true |
305 | | - }, |
306 | | - "keyword_field": { |
307 | | - "type": "keyword" |
308 | | - }, |
309 | | - "numeric_field": { |
310 | | - "type": "long", |
311 | | - "doc_values": true |
312 | | - }, |
313 | | - "date_field": { |
314 | | - "type": "date", |
315 | | - "store": true |
316 | | - } |
317 | | - } |
318 | | - }"""; |
319 | | - |
320 | | - // Create destination index with normal settings |
321 | | - String destMapping = """ |
322 | | - { |
323 | | - "properties": { |
324 | | - "text_field": { |
325 | | - "type": "text" |
326 | | - }, |
327 | | - "keyword_field": { |
328 | | - "type": "keyword" |
329 | | - }, |
330 | | - "numeric_field": { |
331 | | - "type": "long" |
332 | | - }, |
333 | | - "date_field": { |
334 | | - "type": "date" |
335 | | - } |
336 | | - } |
337 | | - }"""; |
338 | | - |
339 | | - // Create source index |
340 | | - assertAcked( |
341 | | - prepareCreate("source_index").setSettings( |
342 | | - Settings.builder().put("index.number_of_shards", 2).put("index.derived_source.enabled", true) |
343 | | - ).setMapping(sourceMapping) |
344 | | - ); |
345 | | - |
346 | | - // Create destination index |
347 | | - assertAcked(prepareCreate("dest_index").setMapping(destMapping)); |
348 | | - |
349 | | - // Index test documents |
350 | | - int numDocs = randomIntBetween(100, 200); |
351 | | - final List<IndexRequestBuilder> docs = new ArrayList<>(); |
352 | | - for (int i = 0; i < numDocs; i++) { |
353 | | - docs.add( |
354 | | - client().prepareIndex("source_index") |
355 | | - .setId(Integer.toString(i)) |
356 | | - .setSource( |
357 | | - "text_field", |
358 | | - "text value " + i, |
359 | | - "keyword_field", |
360 | | - "key_" + i, |
361 | | - "numeric_field", |
362 | | - i, |
363 | | - "date_field", |
364 | | - System.currentTimeMillis() |
365 | | - ) |
366 | | - ); |
367 | | - } |
368 | | - indexRandom(true, docs); |
369 | | - refresh("source_index"); |
370 | | - |
371 | | - // Test 1: Basic reindex without slices |
372 | | - ReindexRequestBuilder reindex = reindex().source("source_index").destination("dest_index").refresh(true); |
373 | | - BulkByScrollResponse response = reindex.get(); |
374 | | - assertThat(response, matcher().created(numDocs)); |
375 | | - verifyReindexedContent("dest_index", numDocs); |
376 | | - |
377 | | - // Test 2: Reindex with query filter |
378 | | - String destFilteredIndex = "dest_filtered_index"; |
379 | | - assertAcked(prepareCreate(destFilteredIndex).setMapping(destMapping)); |
380 | | - reindex = reindex().source("source_index").destination(destFilteredIndex).filter(termQuery("keyword_field", "key_1")).refresh(true); |
381 | | - response = reindex.get(); |
382 | | - assertThat(response, matcher().created(1)); |
383 | | - verifyReindexedContent(destFilteredIndex, 1); |
384 | | - |
385 | | - // Test 3: Reindex with slices |
386 | | - String destSlicedIndex = "dest_sliced_index"; |
387 | | - assertAcked(prepareCreate(destSlicedIndex).setMapping(destMapping)); |
388 | | - int slices = randomSlices(); |
389 | | - int expectedSlices = expectedSliceStatuses(slices, "source_index"); |
390 | | - |
391 | | - reindex = reindex().source("source_index").destination(destSlicedIndex).setSlices(slices).refresh(true); |
392 | | - response = reindex.get(); |
393 | | - assertThat(response, matcher().created(numDocs).slices(hasSize(expectedSlices))); |
394 | | - verifyReindexedContent(destSlicedIndex, numDocs); |
395 | | - |
396 | | - // Test 4: Reindex with field transformation |
397 | | - String destTransformedIndex = "dest_transformed_index"; |
398 | | - String transformedMapping = """ |
399 | | - { |
400 | | - "properties": { |
401 | | - "new_text_field": { |
402 | | - "type": "text" |
403 | | - }, |
404 | | - "new_keyword_field": { |
405 | | - "type": "keyword" |
406 | | - }, |
407 | | - "modified_numeric": { |
408 | | - "type": "long" |
409 | | - }, |
410 | | - "date_field": { |
411 | | - "type": "date" |
412 | | - } |
413 | | - } |
414 | | - }"""; |
415 | | - assertAcked(prepareCreate(destTransformedIndex).setMapping(transformedMapping)); |
416 | | - |
417 | | - // First reindex the documents |
418 | | - reindex = reindex().source("source_index").destination(destTransformedIndex).refresh(true); |
419 | | - response = reindex.get(); |
420 | | - assertThat(response, matcher().created(numDocs)); |
421 | | - |
422 | | - // Then transform using bulk update |
423 | | - BulkRequestBuilder bulkRequest = client().prepareBulk(); |
424 | | - SearchResponse searchResponse = client().prepareSearch(destTransformedIndex).setQuery(matchAllQuery()).setSize(numDocs).get(); |
425 | | - |
426 | | - for (SearchHit hit : searchResponse.getHits()) { |
427 | | - Map<String, Object> source = hit.getSourceAsMap(); |
428 | | - Map<String, Object> newSource = new HashMap<>(); |
429 | | - |
430 | | - // Transform fields |
431 | | - newSource.put("new_text_field", source.get("text_field")); |
432 | | - newSource.put("new_keyword_field", source.get("keyword_field")); |
433 | | - newSource.put("modified_numeric", ((Number) source.get("numeric_field")).longValue() + 1000); |
434 | | - newSource.put("date_field", source.get("date_field")); |
435 | | - |
436 | | - bulkRequest.add(client().prepareIndex(destTransformedIndex).setId(hit.getId()).setSource(newSource)); |
437 | | - } |
438 | | - |
439 | | - BulkResponse bulkResponse = bulkRequest.get(); |
440 | | - assertFalse(bulkResponse.hasFailures()); |
441 | | - refresh(destTransformedIndex); |
442 | | - verifyTransformedContent(destTransformedIndex, numDocs); |
443 | | - } |
444 | | - |
445 | | - private void verifyReindexedContent(String indexName, int expectedCount) { |
446 | | - refresh(indexName); |
447 | | - SearchResponse searchResponse = client().prepareSearch(indexName) |
448 | | - .setQuery(matchAllQuery()) |
449 | | - .setSize(expectedCount) |
450 | | - .addSort("numeric_field", SortOrder.ASC) |
451 | | - .get(); |
452 | | - |
453 | | - assertHitCount(searchResponse, expectedCount); |
454 | | - |
455 | | - for (SearchHit hit : searchResponse.getHits()) { |
456 | | - Map<String, Object> source = hit.getSourceAsMap(); |
457 | | - int id = Integer.parseInt(hit.getId()); |
458 | | - |
459 | | - assertEquals("text value " + id, source.get("text_field")); |
460 | | - assertEquals("key_" + id, source.get("keyword_field")); |
461 | | - assertEquals(id, ((Number) source.get("numeric_field")).intValue()); |
462 | | - assertNotNull(source.get("date_field")); |
463 | | - } |
464 | | - } |
465 | | - |
466 | | - private void verifyTransformedContent(String indexName, int expectedCount) { |
467 | | - refresh(indexName); |
468 | | - SearchResponse searchResponse = client().prepareSearch(indexName) |
469 | | - .setQuery(matchAllQuery()) |
470 | | - .setSize(expectedCount) |
471 | | - .addSort("modified_numeric", SortOrder.ASC) |
472 | | - .get(); |
473 | | - |
474 | | - assertHitCount(searchResponse, expectedCount); |
475 | | - |
476 | | - for (SearchHit hit : searchResponse.getHits()) { |
477 | | - Map<String, Object> source = hit.getSourceAsMap(); |
478 | | - int id = Integer.parseInt(hit.getId()); |
479 | | - |
480 | | - assertEquals("text value " + id, source.get("new_text_field")); |
481 | | - assertEquals("key_" + id, source.get("new_keyword_field")); |
482 | | - assertEquals(id + 1000, ((Number) source.get("modified_numeric")).longValue()); |
483 | | - assertNotNull(source.get("date_field")); |
484 | | - } |
485 | | - } |
486 | 180 | } |
0 commit comments