Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Introduced internal API for retrieving metadata about requested indices from transport actions ([#18523](https://github.com/opensearch-project/OpenSearch/pull/18523))
- Add cluster defaults for merge autoThrottle, maxMergeThreads, and maxMergeCount; Add segment size filter to the merged segment warmer ([#19629](https://github.com/opensearch-project/OpenSearch/pull/19629))
- Add SMILE/CBOR/YAML document format support to Bulk GRPC endpoint ([#19744](https://github.com/opensearch-project/OpenSearch/pull/19744))
- Support pull-based ingestion message mappers and raw payload support ([#19765](https://github.com/opensearch-project/OpenSearch/pull/19765)]

### Changed
- Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,4 +653,122 @@ private boolean validateOffsetBasedLagForPrimaryAndReplica(long expectedLag) {
&& shardTypeToStats.get("replica").getConsumerStats().pointerBasedLag() == expectedLag;
return valid;
}

public void testRawPayloadMapperIngestion() throws Exception {
// Start cluster
internalCluster().startClusterManagerOnlyNode();
final String nodeA = internalCluster().startDataOnlyNode();

// Publish 2 valid messages
String validMessage1 = "{\"name\":\"alice\",\"age\":30}";
String validMessage2 = "{\"name\":\"bob\",\"age\":25}";
produceData(validMessage1);
produceData(validMessage2);

// Create index with raw_payload mapper
createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.param.topic", topicName)
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.mapper_type", "raw_payload")
.put("ingestion_source.error_strategy", "drop")
.put("ingestion_source.all_active", true)
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);

ensureGreen(indexName);

// Wait for both messages to be indexed
waitForSearchableDocs(2, List.of(nodeA));

// Verify stats show 2 processed messages
waitForState(() -> {
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
.getPollingIngestStats();
return stats != null
&& stats.getMessageProcessorStats().totalProcessedCount() == 2L
&& stats.getConsumerStats().totalPolledCount() == 2L
&& stats.getConsumerStats().totalPollerMessageFailureCount() == 0L
&& stats.getConsumerStats().totalPollerMessageDroppedCount() == 0L
&& stats.getMessageProcessorStats().totalInvalidMessageCount() == 0L;
});

// Validate document content
SearchResponse searchResponse = client().prepareSearch(indexName).get();
assertEquals(2, searchResponse.getHits().getHits().length);
for (int i = 0; i < searchResponse.getHits().getHits().length; i++) {
Map<String, Object> source = searchResponse.getHits().getHits()[i].getSourceAsMap();
assertTrue(source.containsKey("name"));
assertTrue(source.containsKey("age"));
}

// Publish invalid JSON message
String invalidJsonMessage = "{ invalid json";
produceData(invalidJsonMessage);

// Wait for consumer to encounter the error and drop it
waitForState(() -> {
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
.getPollingIngestStats();
return stats != null
&& stats.getConsumerStats().totalPolledCount() == 3L
&& stats.getConsumerStats().totalPollerMessageFailureCount() == 1L
&& stats.getConsumerStats().totalPollerMessageDroppedCount() == 1L
&& stats.getMessageProcessorStats().totalProcessedCount() == 2L;
});

// Publish message with invalid content that will fail at processor level
String invalidFieldTypeMessage = "{\"name\":123,\"age\":\"not a number\"}";
produceData(invalidFieldTypeMessage);

// Wait for processor to encounter the error
waitForState(() -> {
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
.getPollingIngestStats();
return stats != null
&& stats.getConsumerStats().totalPolledCount() == 4L
&& stats.getConsumerStats().totalPollerMessageFailureCount() == 1L
&& stats.getMessageProcessorStats().totalProcessedCount() == 3L
&& stats.getMessageProcessorStats().totalFailedCount() == 1L
&& stats.getMessageProcessorStats().totalFailuresDroppedCount() == 1L;
});

// Pause ingestion, reset to offset 0, and resume
pauseIngestion(indexName);
waitForState(() -> {
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
return ingestionState.getShardStates().length == 1
&& ingestionState.getFailedShards() == 0
&& ingestionState.getShardStates()[0].isPollerPaused()
&& ingestionState.getShardStates()[0].getPollerState().equalsIgnoreCase("paused");
});

// Resume with reset to offset 0 (will re-process the 2 valid messages)
resumeIngestion(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.OFFSET, "0");
waitForState(() -> {
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
return ingestionState.getShardStates().length == 1
&& ingestionState.getShardStates()[0].isPollerPaused() == false
&& (ingestionState.getShardStates()[0].getPollerState().equalsIgnoreCase("polling")
|| ingestionState.getShardStates()[0].getPollerState().equalsIgnoreCase("processing"));
});

// Wait for the 3 messages to be processed by the processor after reset (1 will be dropped by the poller)
waitForState(() -> {
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
.getPollingIngestStats();
return stats != null && stats.getMessageProcessorStats().totalProcessedCount() == 3L;
});

// Verify still only 2 documents (no duplicates must be indexed)
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(2L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
import org.opensearch.indices.pollingingest.StreamPoller;
import org.opensearch.indices.pollingingest.mappers.IngestionMessageMapper;
import org.opensearch.indices.replication.SegmentReplicationSource;
import org.opensearch.indices.replication.common.ReplicationType;

Expand Down Expand Up @@ -923,6 +924,18 @@ public Iterator<Setting<?>> settings() {
Property.Final
);

/**
* Defines how the incoming ingestion message payload is mapped to the internal message format.
*/
public static final String SETTING_INGESTION_SOURCE_MAPPER_TYPE = "index.ingestion_source.mapper_type";
public static final Setting<IngestionMessageMapper.MapperType> INGESTION_SOURCE_MAPPER_TYPE_SETTING = new Setting<>(
SETTING_INGESTION_SOURCE_MAPPER_TYPE,
IngestionMessageMapper.MapperType.DEFAULT.getName(),
IngestionMessageMapper.MapperType::fromString,
Property.IndexScope,
Property.Final
);

/**
* Defines if all-active pull-based ingestion is enabled. In this mode, replicas will directly consume from the
* streaming source and process the updates. In the default document replication mode, this setting must be enabled.
Expand Down Expand Up @@ -1227,6 +1240,7 @@ public IngestionSource getIngestionSource() {
final int blockingQueueSize = INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING.get(settings);
final boolean allActiveIngestionEnabled = INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING.get(settings);
final TimeValue pointerBasedLagUpdateInterval = INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING.get(settings);
final IngestionMessageMapper.MapperType mapperType = INGESTION_SOURCE_MAPPER_TYPE_SETTING.get(settings);

return new IngestionSource.Builder(ingestionSourceType).setParams(ingestionSourceParams)
.setPointerInitReset(pointerInitReset)
Expand All @@ -1237,6 +1251,7 @@ public IngestionSource getIngestionSource() {
.setBlockingQueueSize(blockingQueueSize)
.setAllActiveIngestion(allActiveIngestionEnabled)
.setPointerBasedLagUpdateInterval(pointerBasedLagUpdateInterval)
.setMapperType(mapperType)
.build();
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
import org.opensearch.indices.pollingingest.StreamPoller;
import org.opensearch.indices.pollingingest.mappers.IngestionMessageMapper;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_MAPPER_TYPE_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_MAX_POLL_SIZE;
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING;
Expand All @@ -40,6 +42,7 @@ public class IngestionSource {
private int blockingQueueSize;
private final boolean allActiveIngestion;
private final TimeValue pointerBasedLagUpdateInterval;
private final IngestionMessageMapper.MapperType mapperType;

private IngestionSource(
String type,
Expand All @@ -51,7 +54,8 @@ private IngestionSource(
int numProcessorThreads,
int blockingQueueSize,
boolean allActiveIngestion,
TimeValue pointerBasedLagUpdateInterval
TimeValue pointerBasedLagUpdateInterval,
IngestionMessageMapper.MapperType mapperType
) {
this.type = type;
this.pointerInitReset = pointerInitReset;
Expand All @@ -63,6 +67,7 @@ private IngestionSource(
this.blockingQueueSize = blockingQueueSize;
this.allActiveIngestion = allActiveIngestion;
this.pointerBasedLagUpdateInterval = pointerBasedLagUpdateInterval;
this.mapperType = mapperType;
}

public String getType() {
Expand Down Expand Up @@ -105,6 +110,10 @@ public TimeValue getPointerBasedLagUpdateInterval() {
return pointerBasedLagUpdateInterval;
}

public IngestionMessageMapper.MapperType getMapperType() {
return mapperType;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -119,7 +128,8 @@ public boolean equals(Object o) {
&& Objects.equals(numProcessorThreads, ingestionSource.numProcessorThreads)
&& Objects.equals(blockingQueueSize, ingestionSource.blockingQueueSize)
&& Objects.equals(allActiveIngestion, ingestionSource.allActiveIngestion)
&& Objects.equals(pointerBasedLagUpdateInterval, ingestionSource.pointerBasedLagUpdateInterval);
&& Objects.equals(pointerBasedLagUpdateInterval, ingestionSource.pointerBasedLagUpdateInterval)
&& Objects.equals(mapperType, ingestionSource.mapperType);
}

@Override
Expand All @@ -134,7 +144,8 @@ public int hashCode() {
numProcessorThreads,
blockingQueueSize,
allActiveIngestion,
pointerBasedLagUpdateInterval
pointerBasedLagUpdateInterval,
mapperType
);
}

Expand Down Expand Up @@ -164,6 +175,9 @@ public String toString() {
+ allActiveIngestion
+ ", pointerBasedLagUpdateInterval="
+ pointerBasedLagUpdateInterval
+ ", mapperType='"
+ mapperType
+ '\''
+ '}';
}

Expand Down Expand Up @@ -225,6 +239,7 @@ public static class Builder {
private TimeValue pointerBasedLagUpdateInterval = INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING.getDefault(
Settings.EMPTY
);
private IngestionMessageMapper.MapperType mapperType = INGESTION_SOURCE_MAPPER_TYPE_SETTING.getDefault(Settings.EMPTY);

public Builder(String type) {
this.type = type;
Expand All @@ -239,6 +254,7 @@ public Builder(IngestionSource ingestionSource) {
this.blockingQueueSize = ingestionSource.blockingQueueSize;
this.allActiveIngestion = ingestionSource.allActiveIngestion;
this.pointerBasedLagUpdateInterval = ingestionSource.pointerBasedLagUpdateInterval;
this.mapperType = ingestionSource.mapperType;
}

public Builder setPointerInitReset(PointerInitReset pointerInitReset) {
Expand Down Expand Up @@ -291,6 +307,11 @@ public Builder setPointerBasedLagUpdateInterval(TimeValue pointerBasedLagUpdateI
return this;
}

public Builder setMapperType(IngestionMessageMapper.MapperType mapperType) {
this.mapperType = mapperType;
return this;
}

public IngestionSource build() {
return new IngestionSource(
type,
Expand All @@ -302,7 +323,8 @@ public IngestionSource build() {
numProcessorThreads,
blockingQueueSize,
allActiveIngestion,
pointerBasedLagUpdateInterval
pointerBasedLagUpdateInterval,
mapperType
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetadata.INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING,
IndexMetadata.INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING,
IndexMetadata.INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING,
IndexMetadata.INGESTION_SOURCE_MAPPER_TYPE_SETTING,

// Settings for search replica
IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ private void initializeStreamPoller(
.numProcessorThreads(ingestionSource.getNumProcessorThreads())
.blockingQueueSize(ingestionSource.getBlockingQueueSize())
.pointerBasedLagUpdateInterval(ingestionSource.getPointerBasedLagUpdateInterval().millis())
.mapperType(ingestionSource.getMapperType())
.build();
registerStreamPollerListener();

Expand Down
Loading
Loading