From 16dd9d0eaf517fb30a412a53022e55173fe89af5 Mon Sep 17 00:00:00 2001 From: Yupeng Fu Date: Tue, 31 Dec 2024 11:44:11 -0800 Subject: [PATCH] use a blocking queue to pass polled messages to the processor for processing --- .../index/engine/IngestionEngine.java | 24 +-- .../indices/ingest/DefaultStreamPoller.java | 67 +++++-- .../indices/ingest/MessageProcessor.java | 114 ------------ .../ingest/MessageProcessorRunnable.java | 167 ++++++++++++++++++ .../index/engine/FakeIngestionSource.java | 3 +- .../index/engine/IngestionEngineTests.java | 3 +- .../ingest/DefaultStreamPollerTests.java | 44 ++--- .../indices/ingest/MessageProcessorTests.java | 15 +- 8 files changed, 266 insertions(+), 171 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/indices/ingest/MessageProcessor.java create mode 100644 server/src/main/java/org/opensearch/indices/ingest/MessageProcessorRunnable.java diff --git a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java index 06af82022f200..0e464150f8198 100644 --- a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java @@ -56,14 +56,21 @@ import org.opensearch.index.translog.TranslogManager; import org.opensearch.index.translog.TranslogStats; import org.opensearch.indices.ingest.DefaultStreamPoller; -import org.opensearch.indices.ingest.MessageProcessor; import org.opensearch.indices.ingest.StreamPoller; import org.opensearch.search.suggest.completion.CompletionStats; import org.opensearch.threadpool.ThreadPool; import java.io.Closeable; import java.io.IOException; -import java.util.*; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -149,8 +156,9 @@ public Translog.Operation next() { logger.info("created ingestion consumer for shard [{}]", engineConfig.getShardId()); Map commitData = commitDataAsMap(); - StreamPoller.ResetState resetState = - StreamPoller.ResetState.valueOf(ingestionSource.getPointerInitReset().toUpperCase(Locale.ROOT)); + StreamPoller.ResetState resetState = StreamPoller.ResetState.valueOf( + ingestionSource.getPointerInitReset().toUpperCase(Locale.ROOT) + ); IngestionShardPointer startPointer = null; Set persistedPointers = new HashSet<>(); if (commitData.containsKey(StreamPoller.BATCH_START)) { @@ -167,13 +175,7 @@ public Translog.Operation next() { resetState = StreamPoller.ResetState.NONE; } - streamPoller = new DefaultStreamPoller( - startPointer, - persistedPointers, - ingestionShardConsumer, - new MessageProcessor(this), - resetState - ); + streamPoller = new DefaultStreamPoller(startPointer, persistedPointers, ingestionShardConsumer, this, resetState); streamPoller.start(); success = true; } catch (IOException | TranslogCorruptedException e) { diff --git a/server/src/main/java/org/opensearch/indices/ingest/DefaultStreamPoller.java b/server/src/main/java/org/opensearch/indices/ingest/DefaultStreamPoller.java index e16c3834e507b..d1fd6083c9d75 100644 --- a/server/src/main/java/org/opensearch/indices/ingest/DefaultStreamPoller.java +++ b/server/src/main/java/org/opensearch/indices/ingest/DefaultStreamPoller.java @@ -14,10 +14,14 @@ import org.opensearch.index.IngestionShardConsumer; import org.opensearch.index.IngestionShardPointer; import org.opensearch.index.Message; +import org.opensearch.index.engine.IngestionEngine; import java.util.List; import java.util.Locale; +import java.util.Objects; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -42,7 +46,7 @@ public class DefaultStreamPoller implements StreamPoller { private ExecutorService consumerThread; - private MessageProcessor processor; + private ExecutorService processorThread; // start of the batch, inclusive private IngestionShardPointer batchStartPointer; @@ -51,6 +55,10 @@ public class DefaultStreamPoller implements StreamPoller { private Set persistedPointers; + private BlockingQueue> blockingQueue; + + private MessageProcessorRunnable processorRunnable; + // A pointer to the max persisted pointer for optimizing the check @Nullable private IngestionShardPointer maxPersistedPointer; @@ -59,20 +67,46 @@ public DefaultStreamPoller( IngestionShardPointer startPointer, Set persistedPointers, IngestionShardConsumer consumer, - MessageProcessor processor, + IngestionEngine ingestionEngine, + ResetState resetState + ) { + this( + startPointer, + persistedPointers, + consumer, + new MessageProcessorRunnable(new ArrayBlockingQueue<>(100), ingestionEngine), + resetState + ); + } + + DefaultStreamPoller( + IngestionShardPointer startPointer, + Set persistedPointers, + IngestionShardConsumer consumer, + MessageProcessorRunnable processorRunnable, ResetState resetState ) { - this.consumer = consumer; - this.processor = processor; + this.consumer = Objects.requireNonNull(consumer); this.resetState = resetState; batchStartPointer = startPointer; this.persistedPointers = persistedPointers; if (!this.persistedPointers.isEmpty()) { maxPersistedPointer = this.persistedPointers.stream().max(IngestionShardPointer::compareTo).get(); } - this.consumerThread = Executors.newSingleThreadExecutor(r -> new Thread( + this.processorRunnable = processorRunnable; + blockingQueue = processorRunnable.getBlockingQueue(); + this.consumerThread = Executors.newSingleThreadExecutor( + r -> new Thread( + r, + String.format(Locale.ROOT, "stream-poller-consumer-%d-%d", consumer.getShardId(), System.currentTimeMillis()) + ) + ); + + // TODO: allow multiple threads for processing the messages in parallel + this.processorThread = Executors.newSingleThreadExecutor( + r -> new Thread( r, - String.format(Locale.ROOT, "stream-poller-%d-%d", consumer.getShardId(), System.currentTimeMillis()) + String.format(Locale.ROOT, "stream-poller-processor-%d-%d", consumer.getShardId(), System.currentTimeMillis()) ) ); } @@ -83,7 +117,8 @@ public void start() { throw new RuntimeException("poller is closed!"); } started = true; - consumerThread.submit(this::startPoll).isDone(); + consumerThread.submit(this::startPoll); + processorThread.submit(processorRunnable); } /** @@ -126,7 +161,7 @@ protected void startPoll() { // TODO: make sleep time configurable Thread.sleep(100); } catch (Throwable e) { - logger.error("Error in pausing the poller of shard {}", consumer.getShardId(), e); + logger.error("Error in pausing the poller of shard {}: {}", consumer.getShardId(), e); } continue; } @@ -146,16 +181,15 @@ protected void startPoll() { state = State.PROCESSING; // process the records - // TODO: separate threads for processing the messages in parallel for (IngestionShardConsumer.ReadResult result : results) { // check if the message is already processed if (isProcessed(result.getPointer())) { logger.info("Skipping message with pointer {} as it is already processed", result.getPointer().asString()); continue; } - processor.process(result.getMessage(), result.getPointer()); + blockingQueue.put(result); logger.debug( - "Processed message {} with pointer {}", + "Put message {} with pointer {} to the blocking queue", String.valueOf(result.getMessage().getPayload()), result.getPointer().asString() ); @@ -164,7 +198,7 @@ protected void startPoll() { batchStartPointer = consumer.nextPointer(); } catch (Throwable e) { // TODO better error handling - logger.error("Error in polling the shard {}", consumer.getShardId(), e); + logger.error("Error in polling the shard {}: {}", consumer.getShardId(), e); } } } @@ -210,10 +244,6 @@ public void close() { logger.info("consumer thread not started"); return; } - if (consumerThread.isShutdown()) { - logger.info("consumer thread already closed"); - return; - } long startTime = System.currentTimeMillis(); // Record the start time long timeout = 5000; while (state != State.CLOSED) { @@ -225,10 +255,13 @@ public void close() { try { Thread.sleep(100); } catch (Throwable e) { - logger.error("Error in closing the poller of shard {}", consumer.getShardId(), e); + logger.error("Error in closing the poller of shard {}: {}", consumer.getShardId(), e); } } + blockingQueue.clear(); consumerThread.shutdown(); + // interrupts the processor + processorThread.shutdownNow(); logger.info("closed the poller of shard {}", consumer.getShardId()); } diff --git a/server/src/main/java/org/opensearch/indices/ingest/MessageProcessor.java b/server/src/main/java/org/opensearch/indices/ingest/MessageProcessor.java deleted file mode 100644 index 613f12b49a1ac..0000000000000 --- a/server/src/main/java/org/opensearch/indices/ingest/MessageProcessor.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices.ingest; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.lucene.document.StoredField; -import org.apache.lucene.index.Term; -import org.opensearch.common.lucene.uid.Versions; -import org.opensearch.core.common.bytes.BytesArray; -import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.xcontent.MediaTypeRegistry; -import org.opensearch.index.IngestionShardPointer; -import org.opensearch.index.Message; -import org.opensearch.index.VersionType; -import org.opensearch.index.engine.Engine; -import org.opensearch.index.engine.IngestionEngine; -import org.opensearch.index.mapper.ParseContext; -import org.opensearch.index.mapper.ParsedDocument; -import org.opensearch.index.mapper.SourceToParse; - -import java.io.IOException; - -import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; - -/** - * A class to process messages from the ingestion stream. It extracts the payload from the message and creates an - * engine operation. - */ -public class MessageProcessor { - private static final Logger logger = LogManager.getLogger(MessageProcessor.class); - - private final IngestionEngine engine; - - /** - * Constructor. - * - * @param engine the ingestion engine - */ - public MessageProcessor(IngestionEngine engine) { - this.engine = engine; - } - - /** - * Process the message and create an engine operation. It also records the offset in the document as (1) a point - * field used for range search, (2) a stored field for retrieval. - * - * @param message the message to process - * @param pointer the pointer to the message - */ - public void process(Message message, IngestionShardPointer pointer) { - byte[] payload = (byte[]) message.getPayload(); - - Engine.Operation operation = getOperation(payload, pointer); - try { - switch (operation.operationType()) { - case INDEX: - engine.index((Engine.Index) operation); - break; - case DELETE: - engine.delete((Engine.Delete) operation); - break; - default: - throw new IllegalArgumentException("Invalid operation: " + operation); - } - } catch (IOException e) { - logger.error("Failed to process operation {} from message {}", operation, message, e); - throw new RuntimeException(e); - } - } - - /** - * Visible for testing. Get the engine operation from the message. - * @param payload the payload of the message - * @param pointer the pointer to the message - * @return the engine operation - */ - protected Engine.Operation getOperation(byte[] payload, IngestionShardPointer pointer) { - // TODO: get id from the message - String id = "null"; - BytesReference source = new BytesArray(payload); - SourceToParse sourceToParse = new SourceToParse("index", id, source, MediaTypeRegistry.xContentType(source), null); - ParsedDocument doc = engine.getDocumentMapperForType().getDocumentMapper().parse(sourceToParse); - for (ParseContext.Document document : doc.docs()) { - // set the offset as the offset field - document.add(pointer.asPointField(IngestionShardPointer.OFFSET_FIELD)); - // store the offset as string in stored field - document.add(new StoredField(IngestionShardPointer.OFFSET_FIELD, pointer.asString())); - } - // TODO: support delete - Engine.Index index = new Engine.Index( - new Term("_id", id), - doc, - 0, - 1, - Versions.MATCH_ANY, - VersionType.INTERNAL, - Engine.Operation.Origin.PRIMARY, - System.nanoTime(), - System.currentTimeMillis(), - false, - UNASSIGNED_SEQ_NO, - 0 - ); - - return index; - } -} diff --git a/server/src/main/java/org/opensearch/indices/ingest/MessageProcessorRunnable.java b/server/src/main/java/org/opensearch/indices/ingest/MessageProcessorRunnable.java new file mode 100644 index 0000000000000..70d5e357a61e9 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/ingest/MessageProcessorRunnable.java @@ -0,0 +1,167 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.ingest; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.document.StoredField; +import org.apache.lucene.index.Term; +import org.opensearch.common.lucene.uid.Versions; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.index.IngestionShardConsumer; +import org.opensearch.index.IngestionShardPointer; +import org.opensearch.index.Message; +import org.opensearch.index.VersionType; +import org.opensearch.index.engine.Engine; +import org.opensearch.index.engine.IngestionEngine; +import org.opensearch.index.mapper.ParseContext; +import org.opensearch.index.mapper.ParsedDocument; +import org.opensearch.index.mapper.SourceToParse; + +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; + +/** + * A class to process messages from the ingestion stream. It extracts the payload from the message and creates an + * engine operation. + */ +public class MessageProcessorRunnable implements Runnable { + private static final Logger logger = LogManager.getLogger(MessageProcessorRunnable.class); + + private final BlockingQueue> blockingQueue; + private final MessageProcessor messageProcessor; + + /** + * Constructor. + * + * @param blockingQueue the blocking queue to poll messages from + * @param engine the ingestion engine + */ + public MessageProcessorRunnable( + BlockingQueue> blockingQueue, + IngestionEngine engine + ) { + this(blockingQueue, new MessageProcessor(engine)); + } + + /** + * Constructor visible for testing. + * @param blockingQueue the blocking queue to poll messages from + * @param messageProcessor the message processor + */ + MessageProcessorRunnable( + BlockingQueue> blockingQueue, + MessageProcessor messageProcessor + ) { + this.blockingQueue = Objects.requireNonNull(blockingQueue); + this.messageProcessor = messageProcessor; + } + + static class MessageProcessor { + private final IngestionEngine engine; + + MessageProcessor(IngestionEngine engine) { + this.engine = engine; + } + + /** + * Visible for testing. Process the message and create an engine operation. + * + * Process the message and create an engine operation. It also records the offset in the document as (1) a point + * field used for range search, (2) a stored field for retrieval. + * + * @param message the message to process + * @param pointer the pointer to the message + */ + protected void process(Message message, IngestionShardPointer pointer) { + byte[] payload = (byte[]) message.getPayload(); + + Engine.Operation operation = getOperation(payload, pointer); + try { + switch (operation.operationType()) { + case INDEX: + engine.index((Engine.Index) operation); + break; + case DELETE: + engine.delete((Engine.Delete) operation); + break; + default: + throw new IllegalArgumentException("Invalid operation: " + operation); + } + } catch (IOException e) { + logger.error("Failed to process operation {} from message {}: {}", operation, message, e); + throw new RuntimeException(e); + } + } + + /** + * Visible for testing. Get the engine operation from the message. + * @param payload the payload of the message + * @param pointer the pointer to the message + * @return the engine operation + */ + protected Engine.Operation getOperation(byte[] payload, IngestionShardPointer pointer) { + // TODO: get id from the message + String id = "null"; + BytesReference source = new BytesArray(payload); + SourceToParse sourceToParse = new SourceToParse("index", id, source, MediaTypeRegistry.xContentType(source), null); + ParsedDocument doc = engine.getDocumentMapperForType().getDocumentMapper().parse(sourceToParse); + for (ParseContext.Document document : doc.docs()) { + // set the offset as the offset field + document.add(pointer.asPointField(IngestionShardPointer.OFFSET_FIELD)); + // store the offset as string in stored field + document.add(new StoredField(IngestionShardPointer.OFFSET_FIELD, pointer.asString())); + } + // TODO: support delete + Engine.Index index = new Engine.Index( + new Term("_id", id), + doc, + 0, + 1, + Versions.MATCH_ANY, + VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, + System.nanoTime(), + System.currentTimeMillis(), + false, + UNASSIGNED_SEQ_NO, + 0 + ); + + return index; + } + } + + BlockingQueue> getBlockingQueue() { + return blockingQueue; + } + + @Override + public void run() { + while (!(Thread.currentThread().isInterrupted())) { + IngestionShardConsumer.ReadResult result = null; + try { + result = blockingQueue.poll(1000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // TODO: add metric + logger.debug("ConcurrentSiaStreamsPoller poll interruptedException", e); + Thread.currentThread().interrupt(); // Restore interrupt status + } + if (result != null) { + messageProcessor.process(result.getMessage(), result.getPointer()); + } + } + } +} diff --git a/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java b/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java index 83478d67d020a..de03dcd313c29 100644 --- a/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java +++ b/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -117,7 +118,7 @@ public byte[] getPayload() { @Override public String toString() { - return new String(payload); + return new String(payload, StandardCharsets.UTF_8); } } diff --git a/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java b/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java index a273cc30f3fb0..e2d901478d197 100644 --- a/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java @@ -27,6 +27,7 @@ import org.junit.Before; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -59,7 +60,7 @@ public void setUp() throws Exception { } private void publishData(String message) { - messages.add(message.getBytes()); + messages.add(message.getBytes(StandardCharsets.UTF_8)); } protected IndexSettings newIndexSettings() { diff --git a/server/src/test/java/org/opensearch/indices/ingest/DefaultStreamPollerTests.java b/server/src/test/java/org/opensearch/indices/ingest/DefaultStreamPollerTests.java index faa5d7722bf63..40c2f3caecdfb 100644 --- a/server/src/test/java/org/opensearch/indices/ingest/DefaultStreamPollerTests.java +++ b/server/src/test/java/org/opensearch/indices/ingest/DefaultStreamPollerTests.java @@ -8,16 +8,18 @@ package org.opensearch.indices.ingest; -import org.junit.After; -import org.junit.Before; import org.opensearch.index.IngestionShardPointer; import org.opensearch.index.engine.FakeIngestionSource; import org.opensearch.test.OpenSearchTestCase; +import org.junit.After; +import org.junit.Before; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -28,24 +30,27 @@ public class DefaultStreamPollerTests extends OpenSearchTestCase { private DefaultStreamPoller poller; private FakeIngestionSource.FakeIngestionConsumer fakeConsumer; - private MessageProcessor mockProcessor; + private MessageProcessorRunnable processorRunnable; + private MessageProcessorRunnable.MessageProcessor processor; private List messages; private Set persistedPointers; @Before public void setUp() throws Exception { super.setUp(); - messages = new ArrayList<>();; - messages.add("{\"name\":\"bob\", \"age\": 24}".getBytes()); - messages.add("{\"name\":\"alice\", \"age\": 21}".getBytes()); + messages = new ArrayList<>(); + ; + messages.add("{\"name\":\"bob\", \"age\": 24}".getBytes(StandardCharsets.UTF_8)); + messages.add("{\"name\":\"alice\", \"age\": 21}".getBytes(StandardCharsets.UTF_8)); fakeConsumer = new FakeIngestionSource.FakeIngestionConsumer(messages, 0); - mockProcessor = mock(MessageProcessor.class); + processor = mock(MessageProcessorRunnable.MessageProcessor.class); + processorRunnable = new MessageProcessorRunnable(new ArrayBlockingQueue<>(5), processor); persistedPointers = new HashSet<>(); poller = new DefaultStreamPoller( new FakeIngestionSource.FakeIngestionShardPointer(0), persistedPointers, fakeConsumer, - mockProcessor, + processorRunnable, StreamPoller.ResetState.NONE ); } @@ -65,31 +70,31 @@ public void testPauseAndResume() throws InterruptedException { assertEquals(DefaultStreamPoller.State.PAUSED, poller.getState()); assertTrue(poller.isPaused()); // no messages are processed - verify(mockProcessor, never()).process(any(),any()); + verify(processor, never()).process(any(), any()); poller.resume(); Thread.sleep(100); // Allow some time for the poller to run assertFalse(poller.isPaused()); // 2 messages are processed - verify(mockProcessor, times(2)).process(any(),any()); + verify(processor, times(2)).process(any(), any()); } - public void testSkipProcessed() throws InterruptedException { - messages.add("{\"name\":\"cathy\", \"age\": 21}".getBytes()); - messages.add("{\"name\":\"danny\", \"age\": 31}".getBytes()); + public void testSkipProcessed() throws InterruptedException { + messages.add("{\"name\":\"cathy\", \"age\": 21}".getBytes(StandardCharsets.UTF_8)); + messages.add("{\"name\":\"danny\", \"age\": 31}".getBytes(StandardCharsets.UTF_8)); persistedPointers.add(new FakeIngestionSource.FakeIngestionShardPointer(1)); persistedPointers.add(new FakeIngestionSource.FakeIngestionShardPointer(2)); poller = new DefaultStreamPoller( new FakeIngestionSource.FakeIngestionShardPointer(0), persistedPointers, fakeConsumer, - mockProcessor, + processorRunnable, StreamPoller.ResetState.NONE ); poller.start(); Thread.sleep(200); // Allow some time for the poller to run // 2 messages are processed, 2 messages are skipped - verify(mockProcessor, times(2)).process(any(),any()); + verify(processor, times(2)).process(any(), any()); assertEquals(new FakeIngestionSource.FakeIngestionShardPointer(2), poller.getMaxPersistedPointer()); } @@ -106,13 +111,12 @@ public void testClose() throws InterruptedException { assertEquals(DefaultStreamPoller.State.CLOSED, poller.getState()); } - public void testResetStateEarliest() throws InterruptedException { poller = new DefaultStreamPoller( new FakeIngestionSource.FakeIngestionShardPointer(1), persistedPointers, fakeConsumer, - mockProcessor, + processorRunnable, StreamPoller.ResetState.EARLIEST ); @@ -120,7 +124,7 @@ public void testResetStateEarliest() throws InterruptedException { Thread.sleep(100); // Allow some time for the poller to run // 2 messages are processed - verify(mockProcessor, times(2)).process(any(),any()); + verify(processor, times(2)).process(any(), any()); } public void testResetStateLatest() throws InterruptedException { @@ -128,14 +132,14 @@ public void testResetStateLatest() throws InterruptedException { new FakeIngestionSource.FakeIngestionShardPointer(0), persistedPointers, fakeConsumer, - mockProcessor, + processorRunnable, StreamPoller.ResetState.LATEST ); poller.start(); Thread.sleep(100); // Allow some time for the poller to run // no messages processed - verify(mockProcessor, never()).process(any(),any()); + verify(processor, never()).process(any(), any()); // reset to the latest assertEquals(new FakeIngestionSource.FakeIngestionShardPointer(2), poller.getBatchStartPointer()); } diff --git a/server/src/test/java/org/opensearch/indices/ingest/MessageProcessorTests.java b/server/src/test/java/org/opensearch/indices/ingest/MessageProcessorTests.java index beb99564704b8..7dd0a788a56d0 100644 --- a/server/src/test/java/org/opensearch/indices/ingest/MessageProcessorTests.java +++ b/server/src/test/java/org/opensearch/indices/ingest/MessageProcessorTests.java @@ -8,9 +8,6 @@ package org.opensearch.indices.ingest; -import org.junit.Before; -import org.mockito.ArgumentCaptor; -import org.opensearch.index.Message; import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.FakeIngestionSource; import org.opensearch.index.engine.IngestionEngine; @@ -20,9 +17,13 @@ import org.opensearch.index.mapper.ParsedDocument; import org.opensearch.index.mapper.SourceToParse; import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; +import java.nio.charset.StandardCharsets; import java.util.List; +import org.mockito.ArgumentCaptor; + import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -32,7 +33,7 @@ public class MessageProcessorTests extends OpenSearchTestCase { private IngestionEngine ingestionEngine; private DocumentMapper documentMapper; private DocumentMapperForType documentMapperForType; - private MessageProcessor processor; + private MessageProcessorRunnable.MessageProcessor processor; @Before public void setUp() throws Exception { @@ -42,16 +43,16 @@ public void setUp() throws Exception { when(ingestionEngine.getDocumentMapperForType()).thenReturn(documentMapperForType); documentMapper = mock(DocumentMapper.class); when(documentMapperForType.getDocumentMapper()).thenReturn(documentMapper); - processor = new MessageProcessor(ingestionEngine); + processor = new MessageProcessorRunnable.MessageProcessor(ingestionEngine); } public void testGetOperation() { - byte[] payload = "{\"name\":\"bob\", \"age\": 24}".getBytes(); + byte[] payload = "{\"name\":\"bob\", \"age\": 24}".getBytes(StandardCharsets.UTF_8); FakeIngestionSource.FakeIngestionShardPointer pointer = new FakeIngestionSource.FakeIngestionShardPointer(0); ParsedDocument parsedDocument = mock(ParsedDocument.class); when(documentMapper.parse(any())).thenReturn(parsedDocument); - when(parsedDocument.docs()).thenReturn(List.of(new ParseContext.Document[]{new ParseContext.Document()})); + when(parsedDocument.docs()).thenReturn(List.of(new ParseContext.Document[] { new ParseContext.Document() })); Engine.Operation operation = processor.getOperation(payload, pointer);