Skip to content

Commit

Permalink
Add UT for factory
Browse files Browse the repository at this point in the history
Signed-off-by: Liyun Xiu <xiliyun@amazon.com>
  • Loading branch information
chishui authored and andrross committed Jun 28, 2024
1 parent 33b2d37 commit 5aa5e5a
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,7 @@ public AbstractBatchingProcessor create(
) throws Exception {
int batchSize = ConfigurationUtils.readIntProperty(this.processorType, tag, config, BATCH_SIZE_FIELD, DEFAULT_BATCH_SIZE);
if (batchSize < 1) {
throw newConfigurationException(
this.processorType,
tag,
BATCH_SIZE_FIELD,
BATCH_SIZE_FIELD + " must be a positive integer"
);
throw newConfigurationException(this.processorType, tag, BATCH_SIZE_FIELD, "batch size must be a positive integer");
}
return newProcessor(tag, description, batchSize, config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@

package org.opensearch.ingest;

import org.opensearch.OpenSearchParseException;
import org.opensearch.test.OpenSearchTestCase;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

public class AbstractBatchingProcessorTests extends OpenSearchTestCase {
Expand Down Expand Up @@ -87,6 +90,29 @@ public void testBatchExecute_defaultBatchSize() {
assertEquals(wrapperList.subList(2, 3), processor.getSubBatches().get(2));
}

public void testFactory_invalidBatchSize() {
Map<String, Object> config = new HashMap<>();
config.put("batch_size", 0);
DummyProcessor.DummyProcessorFactory factory = new DummyProcessor.DummyProcessorFactory("DummyProcessor");
OpenSearchParseException exception = assertThrows(OpenSearchParseException.class, () -> factory.create(config));
assertEquals("[batch_size] batch size must be a positive integer", exception.getMessage());
}

public void testFactory_defaultBatchSize() throws Exception {
Map<String, Object> config = new HashMap<>();
DummyProcessor.DummyProcessorFactory factory = new DummyProcessor.DummyProcessorFactory("DummyProcessor");
DummyProcessor processor = (DummyProcessor) factory.create(config);
assertEquals(1, processor.batchSize);
}

public void testFactory_callNewProcessor() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("batch_size", 3);
DummyProcessor.DummyProcessorFactory factory = new DummyProcessor.DummyProcessorFactory("DummyProcessor");
DummyProcessor processor = (DummyProcessor) factory.create(config);
assertEquals(3, processor.batchSize);
}

static class DummyProcessor extends AbstractBatchingProcessor {
private List<List<IngestDocumentWrapper>> subBatches = new ArrayList<>();

Expand All @@ -113,5 +139,22 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
public String getType() {
return null;
}

public static class DummyProcessorFactory extends Factory {

protected DummyProcessorFactory(String processorType) {
super(processorType);
}

public AbstractBatchingProcessor create(Map<String, Object> config) throws Exception {
final Map<String, org.opensearch.ingest.Processor.Factory> processorFactories = new HashMap<>();
return super.create(processorFactories, "tag", "description", config);
}

@Override
protected AbstractBatchingProcessor newProcessor(String tag, String description, int batchSize, Map<String, Object> config) {
return new DummyProcessor(batchSize);
}
}
}
}

0 comments on commit 5aa5e5a

Please sign in to comment.