Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add a mapper for context aware segments grouping criteria ([#19233](https://github.com/opensearch-project/OpenSearch/pull/19233))
- Return full error for GRPC error response ([#19568](https://github.com/opensearch-project/OpenSearch/pull/19568))
- Add pluggable gRPC interceptors with explicit ordering([#19005](https://github.com/opensearch-project/OpenSearch/pull/19005))

- Add metrics for the merged segment warmer feature ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929))
- Add pointer based lag metric in pull-based ingestion ([#19635](https://github.com/opensearch-project/OpenSearch/pull/19635))

### Changed
- Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public FilePartitionConsumer(FileSourceConfig config, int shardId) {
public List<ReadResult<FileOffset, FileMessage>> readNext(FileOffset offset, boolean includeStart, long maxMessages, int timeoutMillis)
throws TimeoutException {
long startLine = includeStart ? offset.getLine() : offset.getLine() + 1;
lastReadLine = startLine;
return readFromFile(startLine, maxMessages);
}

Expand Down Expand Up @@ -155,6 +154,22 @@ public int getShardId() {
return shardId;
}

@Override
public long getPointerBasedLag(IngestionShardPointer expectedStartPointer) {
if (!shardFile.exists()) {
return 0;
}

FileOffset latestOffset = (FileOffset) latestPointer();
if (lastReadLine < 0) {
// Haven't read anything yet, use the expected start pointer
long startLine = ((FileOffset) expectedStartPointer).getLine();
return Math.max(0, latestOffset.getLine() - startLine);
}
// return lag as number of remaining lines from lastReadLineNumber
return latestOffset.getLine() - lastReadLine - 1;
}

@Override
public void close() throws IOException {
if (reader != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package org.opensearch.plugin.ingestion.fs;

import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.stats.IndexStats;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.action.admin.indices.streamingingestion.pause.PauseIngestionResponse;
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionRequest;
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionResponse;
Expand All @@ -17,6 +19,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.indices.pollingingest.PollingIngestStats;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchSingleNodeTestCase;
import org.opensearch.transport.client.Requests;
Expand All @@ -31,6 +34,8 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

public class FileBasedIngestionSingleNodeTests extends OpenSearchSingleNodeTestCase {
private Path ingestionDir;
Expand Down Expand Up @@ -237,4 +242,167 @@ public void testFileIngestionFromProvidedPointer() throws Exception {
// cleanup the test index
client().admin().indices().delete(new DeleteIndexRequest(index)).actionGet();
}

public void testPointerBasedLag() throws Exception {
String mappings = """
{
"properties": {
"name": { "type": "text" },
"age": { "type": "integer" }
}
}
""";

// Create index with empty file (no messages)
Path streamDir = ingestionDir.resolve(stream);
Path shardFile = streamDir.resolve("0.ndjson");
Files.write(shardFile, new byte[0]); // Empty file

createIndexWithMappingSource(
index,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "FILE")
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.pointer_based_lag_update_interval", "3s")
.put("ingestion_source.param.stream", stream)
.put("ingestion_source.param.base_directory", ingestionDir.toString())
.put("index.replication.type", "SEGMENT")
.build(),
mappings
);
ensureGreen(index);

// Lag should be 0 since there are no messages
waitForState(() -> {
PollingIngestStats stats = getPollingIngestStats(index);
return stats != null && stats.getConsumerStats().pointerBasedLag() == 0L;
});

// Add messages to the file
try (
BufferedWriter writer = Files.newBufferedWriter(
shardFile,
StandardCharsets.UTF_8,
StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING
)
) {
writer.write("{\"_id\":\"1\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"alice\", \"age\": 30}}\n");
writer.write("{\"_id\":\"2\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"bob\", \"age\": 35}}\n");
writer.flush();
}

try (FileChannel channel = FileChannel.open(shardFile, StandardOpenOption.READ)) {
channel.force(true);
}

// Wait for messages to be processed
waitForState(() -> {
SearchResponse response = client().prepareSearch(index).setQuery(new RangeQueryBuilder("age").gte(0)).get();
return response.getHits().getTotalHits().value() == 2;
});

// Lag should be 0 after all messages are consumed
waitForState(() -> {
PollingIngestStats stats = getPollingIngestStats(index);
return stats != null && stats.getConsumerStats().pointerBasedLag() == 0L;
});

// cleanup
client().admin().indices().delete(new DeleteIndexRequest(index)).actionGet();
}

public void testPointerBasedLagAfterPause() throws Exception {
String mappings = """
{
"properties": {
"name": { "type": "text" },
"age": { "type": "integer" }
}
}
""";

createIndexWithMappingSource(
index,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "FILE")
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.pointer_based_lag_update_interval", "3s")
.put("ingestion_source.param.stream", stream)
.put("ingestion_source.param.base_directory", ingestionDir.toString())
.put("index.replication.type", "SEGMENT")
.build(),
mappings
);
ensureGreen(index);

// Wait for initial messages to be processed
waitForState(() -> {
SearchResponse response = client().prepareSearch(index).setQuery(new RangeQueryBuilder("age").gte(0)).get();
return response.getHits().getTotalHits().value() == 2;
});

// Pause ingestion
PauseIngestionResponse pauseResponse = client().admin().indices().pauseIngestion(Requests.pauseIngestionRequest(index)).get();
assertTrue(pauseResponse.isAcknowledged());
assertTrue(pauseResponse.isShardsAcknowledged());

// Wait for pause to take effect
waitForState(() -> {
GetIngestionStateResponse ingestionState = client().admin()
.indices()
.getIngestionState(Requests.getIngestionStateRequest(index))
.get();
return ingestionState.getFailedShards() == 0
&& Arrays.stream(ingestionState.getShardStates())
.allMatch(state -> state.isPollerPaused() && state.getPollerState().equalsIgnoreCase("paused"));
});

// Add more messages to the file while paused
Path streamDir = ingestionDir.resolve(stream);
Path shardFile = streamDir.resolve("0.ndjson");
try (BufferedWriter writer = Files.newBufferedWriter(shardFile, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
writer.write("{\"_id\":\"3\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"charlie\", \"age\": 40}}\n");
writer.write("{\"_id\":\"4\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"diana\", \"age\": 45}}\n");
writer.write("{\"_id\":\"5\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"eve\", \"age\": 50}}\n");
writer.flush();
}

try (FileChannel channel = FileChannel.open(shardFile, StandardOpenOption.READ)) {
channel.force(true);
}

// Wait for lag to be calculated (lag is updated every 3 seconds in this test)
waitForState(() -> {
PollingIngestStats stats = getPollingIngestStats(index);
return stats != null && stats.getConsumerStats().pointerBasedLag() == 3L;
});

// cleanup
client().admin().indices().delete(new DeleteIndexRequest(index)).actionGet();
}

/**
* Helper method to get polling ingest stats for the index
*/
private PollingIngestStats getPollingIngestStats(String indexName) {
IndexStats indexStats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName);
ShardStats[] shards = indexStats.getShards();
if (shards.length > 0) {
return shards[0].getPollingIngestStats();
}
return null;
}

private void waitForState(Callable<Boolean> checkState) throws Exception {
assertBusy(() -> {
if (checkState.call() == false) {
fail("Provided state requirements not met");
}
}, 1, TimeUnit.MINUTES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,66 @@ public void testResetPollerInAllActiveIngestion() throws Exception {
});
}

public void testAllActiveOffsetBasedLag() throws Exception {
// Create all-active pull-based index
internalCluster().startClusterManagerOnlyNode();
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();

createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.param.topic", topicName)
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.pointer_based_lag_update_interval", "3s")
.put("ingestion_source.all_active", true)
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);

ensureGreen(indexName);
// no messages published, expect 0 lag
assertTrue(validateOffsetBasedLagForPrimaryAndReplica(0));

// pause ingestion
PauseIngestionResponse pauseResponse = pauseIngestion(indexName);
assertTrue(pauseResponse.isAcknowledged());
assertTrue(pauseResponse.isShardsAcknowledged());
waitForState(() -> {
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
return ingestionState.getShardStates().length == 2
&& ingestionState.getFailedShards() == 0
&& Arrays.stream(ingestionState.getShardStates())
.allMatch(state -> state.isPollerPaused() && state.getPollerState().equalsIgnoreCase("paused"));
});

// produce 10 messages in paused state and validate lag
for (int i = 0; i < 10; i++) {
produceData(Integer.toString(i), "name" + i, "30");
}
waitForState(() -> validateOffsetBasedLagForPrimaryAndReplica(10));

// resume ingestion
ResumeIngestionResponse resumeResponse = resumeIngestion(indexName);
assertTrue(resumeResponse.isAcknowledged());
assertTrue(resumeResponse.isShardsAcknowledged());
waitForState(() -> {
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
return ingestionState.getShardStates().length == 2
&& Arrays.stream(ingestionState.getShardStates())
.allMatch(
state -> state.isPollerPaused() == false
&& (state.getPollerState().equalsIgnoreCase("polling") || state.getPollerState().equalsIgnoreCase("processing"))
);
});
waitForSearchableDocs(10, List.of(nodeA, nodeB));
waitForState(() -> validateOffsetBasedLagForPrimaryAndReplica(0));
}

// returns PollingIngestStats for single primary and single replica
private Map<String, PollingIngestStats> getPollingIngestStatsForPrimaryAndReplica(String indexName) {
IndexStats indexStats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName);
Expand All @@ -583,4 +643,14 @@ private Map<String, PollingIngestStats> getPollingIngestStatsForPrimaryAndReplic

return shardTypeToStats;
}

private boolean validateOffsetBasedLagForPrimaryAndReplica(long expectedLag) {
boolean valid = true;
Map<String, PollingIngestStats> shardTypeToStats = getPollingIngestStatsForPrimaryAndReplica(indexName);
valid &= shardTypeToStats.get("primary") != null
&& shardTypeToStats.get("primary").getConsumerStats().pointerBasedLag() == expectedLag;
valid &= shardTypeToStats.get("replica") != null
&& shardTypeToStats.get("replica").getConsumerStats().pointerBasedLag() == expectedLag;
return valid;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ protected static Consumer<byte[], byte[]> createConsumer(String clientId, KafkaS
* @param includeStart whether to include the start pointer in the read
* @param maxMessages this setting is not honored for Kafka at this stage. maxMessages is instead set at consumer initialization.
* @param timeoutMillis the maximum time to wait for messages
* @return
* @return the next read result
* @throws TimeoutException
*/
@Override
Expand All @@ -158,7 +158,7 @@ public List<ReadResult<KafkaOffset, KafkaMessage>> readNext(
* Read the next batch of messages from Kafka.
* @param maxMessages this setting is not honored for Kafka at this stage. maxMessages is instead set at consumer initialization.
* @param timeoutMillis the maximum time to wait for messages
* @return
* @return the next read result
* @throws TimeoutException
*/
@Override
Expand Down Expand Up @@ -255,6 +255,35 @@ public int getShardId() {
return topicPartition.partition();
}

/**
* Compute Kafka offset based lag as the difference between latest available offset and last consumed offset.
* Note: This method is not thread-safe and should only be called from the poller thread to avoid multi-threaded
* access to KafkaConsumer.
*
* @param expectedStartPointer the pointer where ingestion would start if no messages have been consumed yet
* @return offset based lag. -1 is returned if errors are encountered.
*/
@Override
public long getPointerBasedLag(IngestionShardPointer expectedStartPointer) {
try {
// Get the end offset for the partition
long endOffset = consumer.endOffsets(Collections.singletonList(topicPartition)).getOrDefault(topicPartition, 0L);

if (lastFetchedOffset < 0) {
// Haven't fetched anything yet, use the expected start pointer.
// Set lag as 0 in case expectedStartPointer is beyond endOffset.
long startOffset = ((KafkaOffset) expectedStartPointer).getOffset();
return Math.max(0, endOffset - startOffset);
}

// Calculate lag as difference between latest and last consumed offset
return endOffset - lastFetchedOffset - 1;
} catch (Exception e) {
logger.warn("Failed to calculate pointer based lag for partition {}: {}", topicPartition.partition(), e.getMessage());
return -1;
}
}

@Override
public void close() throws IOException {
consumer.close();
Expand Down
Loading
Loading