From 3a9145c92ff4c01ec02e290556eb5d9d3e7b9262 Mon Sep 17 00:00:00 2001 From: Liyun Xiu Date: Thu, 27 Jun 2024 10:56:04 +0800 Subject: [PATCH] Rename functions and refactor tests Signed-off-by: Liyun Xiu --- CHANGELOG.md | 1 + .../ingest/AbstractBatchingProcessor.java | 31 ++++--- .../AbstractBatchingProcessorTests.java | 84 ++++++++----------- 3 files changed, 54 insertions(+), 62 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cafe9c20e7ff4..5855e220a6631 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/)) - Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865)) - [Writable Warm] Add composite directory implementation and integrate it with FileCache ([12782](https://github.com/opensearch-project/OpenSearch/pull/12782)) +- Add batching supported processor base type AbstractBatchingProcessor ([#14554](https://github.com/opensearch-project/OpenSearch/pull/14554)) - Fix race condition while parsing derived fields from search definition ([14445](https://github.com/opensearch-project/OpenSearch/pull/14445)) ### Dependencies diff --git a/server/src/main/java/org/opensearch/ingest/AbstractBatchingProcessor.java b/server/src/main/java/org/opensearch/ingest/AbstractBatchingProcessor.java index f15c3f590edc6..c0a6f9ec94ce3 100644 --- a/server/src/main/java/org/opensearch/ingest/AbstractBatchingProcessor.java +++ b/server/src/main/java/org/opensearch/ingest/AbstractBatchingProcessor.java @@ -15,6 +15,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException; + /** * Abstract base class for batch processors. * @@ -37,7 +39,7 @@ protected AbstractBatchingProcessor(String tag, String description, int batchSiz * @param ingestDocumentWrappers {@link List} of {@link IngestDocumentWrapper} to be processed. * @param handler {@link Consumer} to be called with the results of the processing. */ - public abstract void internalBatchExecute( + protected abstract void subBatchExecute( List ingestDocumentWrappers, Consumer> handler ); @@ -49,15 +51,9 @@ public void batchExecute(List ingestDocumentWrappers, Con return; } - // if batch size is 1, use default implementation in Processor to handle documents one at a time. - if (this.batchSize == 1) { - super.batchExecute(ingestDocumentWrappers, handler); - return; - } - // if batch size is larger than document size, send one batch if (this.batchSize >= ingestDocumentWrappers.size()) { - internalBatchExecute(ingestDocumentWrappers, handler); + subBatchExecute(ingestDocumentWrappers, handler); return; } @@ -67,7 +63,7 @@ public void batchExecute(List ingestDocumentWrappers, Con AtomicInteger counter = new AtomicInteger(size); List allResults = Collections.synchronizedList(new ArrayList<>()); for (List batch : batches) { - this.internalBatchExecute(batch, batchResults -> { + this.subBatchExecute(batch, batchResults -> { allResults.addAll(batchResults); if (counter.addAndGet(-batchResults.size()) == 0) { handler.accept(allResults); @@ -114,7 +110,15 @@ public AbstractBatchingProcessor create( String description, Map config ) throws Exception { - int batchSize = ConfigurationUtils.readIntProperty(processorType, tag, config, BATCH_SIZE_FIELD, DEFAULT_BATCH_SIZE); + int batchSize = ConfigurationUtils.readIntProperty(this.processorType, tag, config, BATCH_SIZE_FIELD, DEFAULT_BATCH_SIZE); + if (batchSize < 1) { + throw newConfigurationException( + this.processorType, + tag, + BATCH_SIZE_FIELD, + BATCH_SIZE_FIELD + " must be a positive integer" + ); + } return newProcessor(tag, description, batchSize, config); } @@ -127,6 +131,11 @@ public AbstractBatchingProcessor create( * @param config configuration of the processor * @return a new batch processor instance */ - protected abstract AbstractBatchingProcessor newProcessor(String tag, String description, int batchSize, Map config); + protected abstract AbstractBatchingProcessor newProcessor( + String tag, + String description, + int batchSize, + Map config + ); } } diff --git a/server/src/test/java/org/opensearch/ingest/AbstractBatchingProcessorTests.java b/server/src/test/java/org/opensearch/ingest/AbstractBatchingProcessorTests.java index 10ee5d54a7ed8..b6637bc430f8c 100644 --- a/server/src/test/java/org/opensearch/ingest/AbstractBatchingProcessorTests.java +++ b/server/src/test/java/org/opensearch/ingest/AbstractBatchingProcessorTests.java @@ -9,7 +9,6 @@ package org.opensearch.ingest; import org.opensearch.test.OpenSearchTestCase; -import org.junit.Before; import java.util.ArrayList; import java.util.Arrays; @@ -17,32 +16,17 @@ import java.util.List; import java.util.function.Consumer; -import org.mockito.ArgumentCaptor; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - public class AbstractBatchingProcessorTests extends OpenSearchTestCase { - private static final String DESCRIPTION = "description"; - private static final String TAG = "tag"; - - @Before - public void setup() {} - - public void testBatchExecute_emptyInput() throws Exception { - DummyProcessor processor = spy(new DummyProcessor(TAG, DESCRIPTION, 3)); - Consumer> handler = (results) -> { assertTrue(results.isEmpty()); }; + public void testBatchExecute_emptyInput() { + DummyProcessor processor = new DummyProcessor(3); + Consumer> handler = (results) -> assertTrue(results.isEmpty()); processor.batchExecute(Collections.emptyList(), handler); - verify(processor, never()).internalBatchExecute(anyList(), any()); + assertTrue(processor.getSubBatches().isEmpty()); } - public void testBatchExecute_singleBatchSize() throws Exception { - DummyProcessor processor = spy(new DummyProcessor(TAG, DESCRIPTION, 3)); + public void testBatchExecute_singleBatchSize() { + DummyProcessor processor = new DummyProcessor(3); List wrapperList = Arrays.asList( IngestDocumentPreparer.createIngestDocumentWrapper(1), IngestDocumentPreparer.createIngestDocumentWrapper(2), @@ -51,11 +35,12 @@ public void testBatchExecute_singleBatchSize() throws Exception { List resultList = new ArrayList<>(); processor.batchExecute(wrapperList, resultList::addAll); assertEquals(wrapperList, resultList); - verify(processor, times(1)).internalBatchExecute(anyList(), any()); + assertEquals(1, processor.getSubBatches().size()); + assertEquals(wrapperList, processor.getSubBatches().get(0)); } - public void testBatchExecute_multipleBatches() throws Exception { - DummyProcessor processor = spy(new DummyProcessor(TAG, DESCRIPTION, 2)); + public void testBatchExecute_multipleBatches() { + DummyProcessor processor = new DummyProcessor(2); List wrapperList = Arrays.asList( IngestDocumentPreparer.createIngestDocumentWrapper(1), IngestDocumentPreparer.createIngestDocumentWrapper(2), @@ -66,17 +51,16 @@ public void testBatchExecute_multipleBatches() throws Exception { List resultList = new ArrayList<>(); processor.batchExecute(wrapperList, resultList::addAll); assertEquals(wrapperList, resultList); - ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(List.class); - verify(processor, times(3)).internalBatchExecute(argumentCaptor.capture(), any()); - assertEquals(wrapperList.subList(0, 2), argumentCaptor.getAllValues().get(0)); - assertEquals(wrapperList.subList(2, 4), argumentCaptor.getAllValues().get(1)); - assertEquals(wrapperList.subList(4, 5), argumentCaptor.getAllValues().get(2)); + assertEquals(3, processor.getSubBatches().size()); + assertEquals(wrapperList.subList(0, 2), processor.getSubBatches().get(0)); + assertEquals(wrapperList.subList(2, 4), processor.getSubBatches().get(1)); + assertEquals(wrapperList.subList(4, 5), processor.getSubBatches().get(2)); } - public void testBatchExecute_randomBatches() throws Exception { + public void testBatchExecute_randomBatches() { int batchSize = randomIntBetween(2, 32); int docCount = randomIntBetween(2, 32); - DummyProcessor processor = spy(new DummyProcessor(TAG, DESCRIPTION, batchSize)); + DummyProcessor processor = new DummyProcessor(batchSize); List wrapperList = new ArrayList<>(); for (int i = 0; i < docCount; ++i) { wrapperList.add(IngestDocumentPreparer.createIngestDocumentWrapper(i)); @@ -84,14 +68,11 @@ public void testBatchExecute_randomBatches() throws Exception { List resultList = new ArrayList<>(); processor.batchExecute(wrapperList, resultList::addAll); assertEquals(wrapperList, resultList); - verify(processor, times(docCount / batchSize + (docCount % batchSize == 0 ? 0 : 1))).internalBatchExecute( - anyList(), - any() - ); + assertEquals(docCount / batchSize + (docCount % batchSize == 0 ? 0 : 1), processor.getSubBatches().size()); } - public void testBatchExecute_defaultBatchSize() throws Exception { - DummyProcessor processor = spy(new DummyProcessor(TAG, DESCRIPTION, 1)); + public void testBatchExecute_defaultBatchSize() { + DummyProcessor processor = new DummyProcessor(1); List wrapperList = Arrays.asList( IngestDocumentPreparer.createIngestDocumentWrapper(1), IngestDocumentPreparer.createIngestDocumentWrapper(2), @@ -99,26 +80,27 @@ public void testBatchExecute_defaultBatchSize() throws Exception { ); List resultList = new ArrayList<>(); processor.batchExecute(wrapperList, resultList::addAll); - for (int i = 0; i < wrapperList.size(); ++i) { - assertEquals(wrapperList.get(i).getSlot(), resultList.get(i).getSlot()); - assertEquals(wrapperList.get(i).getIngestDocument(), resultList.get(i).getIngestDocument()); - assertEquals(wrapperList.get(i).getException(), resultList.get(i).getException()); - } - verify(processor, never()).internalBatchExecute(anyList(), any()); - verify(processor, times(3)).execute(any(IngestDocument.class)); + assertEquals(wrapperList, resultList); + assertEquals(3, processor.getSubBatches().size()); + assertEquals(wrapperList.subList(0, 1), processor.getSubBatches().get(0)); + assertEquals(wrapperList.subList(1, 2), processor.getSubBatches().get(1)); + assertEquals(wrapperList.subList(2, 3), processor.getSubBatches().get(2)); } static class DummyProcessor extends AbstractBatchingProcessor { + private List> subBatches = new ArrayList<>(); + + public List> getSubBatches() { + return subBatches; + } - protected DummyProcessor(String tag, String description, int batchSize) { - super(tag, description, batchSize); + protected DummyProcessor(int batchSize) { + super("tag", "description", batchSize); } @Override - public void internalBatchExecute( - List ingestDocumentWrappers, - Consumer> handler - ) { + public void subBatchExecute(List ingestDocumentWrappers, Consumer> handler) { + subBatches.add(ingestDocumentWrappers); handler.accept(ingestDocumentWrappers); }