Skip to content

Commit 36a5c9f

Browse files
include pointer based lag metric for pull-based ingestion
Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
1 parent 14578f6 commit 36a5c9f

File tree

13 files changed

+429
-7
lines changed

13 files changed

+429
-7
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
99
- Add a mapper for context aware segments grouping criteria ([#19233](https://github.com/opensearch-project/OpenSearch/pull/19233))
1010
- Return full error for GRPC error response ([#19568](https://github.com/opensearch-project/OpenSearch/pull/19568))
1111
- Add pluggable gRPC interceptors with explicit ordering([#19005](https://github.com/opensearch-project/OpenSearch/pull/19005))
12-
1312
- Add metrics for the merged segment warmer feature ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929))
13+
- Add pointer based lag metric in pull-based ingestion ([#19635](https://github.com/opensearch-project/OpenSearch/pull/19635))
1414

1515
### Changed
1616
- Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350))

plugins/ingestion-fs/src/main/java/org/opensearch/plugin/ingestion/fs/FilePartitionConsumer.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ public FilePartitionConsumer(FileSourceConfig config, int shardId) {
6262
public List<ReadResult<FileOffset, FileMessage>> readNext(FileOffset offset, boolean includeStart, long maxMessages, int timeoutMillis)
6363
throws TimeoutException {
6464
long startLine = includeStart ? offset.getLine() : offset.getLine() + 1;
65-
lastReadLine = startLine;
6665
return readFromFile(startLine, maxMessages);
6766
}
6867

@@ -155,6 +154,22 @@ public int getShardId() {
155154
return shardId;
156155
}
157156

157+
@Override
158+
public long getPointerBasedLag(IngestionShardPointer expectedStartPointer) {
159+
if (!shardFile.exists()) {
160+
return 0;
161+
}
162+
163+
FileOffset latestOffset = (FileOffset) latestPointer();
164+
if (lastReadLine < 0) {
165+
// Haven't read anything yet, use the expected start pointer
166+
long startLine = ((FileOffset) expectedStartPointer).getLine();
167+
return latestOffset.getLine() - startLine;
168+
}
169+
// return lag as number of remaining lines from lastReadLineNumber
170+
return latestOffset.getLine() - lastReadLine - 1;
171+
}
172+
158173
@Override
159174
public void close() throws IOException {
160175
if (reader != null) {

plugins/ingestion-fs/src/test/java/org/opensearch/plugin/ingestion/fs/FileBasedIngestionSingleNodeTests.java

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
package org.opensearch.plugin.ingestion.fs;
1010

1111
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
12+
import org.opensearch.action.admin.indices.stats.IndexStats;
13+
import org.opensearch.action.admin.indices.stats.ShardStats;
1214
import org.opensearch.action.admin.indices.streamingingestion.pause.PauseIngestionResponse;
1315
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionRequest;
1416
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionResponse;
@@ -17,6 +19,7 @@
1719
import org.opensearch.cluster.metadata.IndexMetadata;
1820
import org.opensearch.common.settings.Settings;
1921
import org.opensearch.index.query.RangeQueryBuilder;
22+
import org.opensearch.indices.pollingingest.PollingIngestStats;
2023
import org.opensearch.plugins.Plugin;
2124
import org.opensearch.test.OpenSearchSingleNodeTestCase;
2225
import org.opensearch.transport.client.Requests;
@@ -31,6 +34,8 @@
3134
import java.util.Arrays;
3235
import java.util.Collection;
3336
import java.util.Collections;
37+
import java.util.concurrent.Callable;
38+
import java.util.concurrent.TimeUnit;
3439

3540
public class FileBasedIngestionSingleNodeTests extends OpenSearchSingleNodeTestCase {
3641
private Path ingestionDir;
@@ -237,4 +242,165 @@ public void testFileIngestionFromProvidedPointer() throws Exception {
237242
// cleanup the test index
238243
client().admin().indices().delete(new DeleteIndexRequest(index)).actionGet();
239244
}
245+
246+
public void testPointerBasedLag() throws Exception {
247+
String mappings = """
248+
{
249+
"properties": {
250+
"name": { "type": "text" },
251+
"age": { "type": "integer" }
252+
}
253+
}
254+
""";
255+
256+
// Create index with empty file (no messages)
257+
Path streamDir = ingestionDir.resolve(stream);
258+
Path shardFile = streamDir.resolve("0.ndjson");
259+
Files.write(shardFile, new byte[0]); // Empty file
260+
261+
createIndexWithMappingSource(
262+
index,
263+
Settings.builder()
264+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
265+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
266+
.put("ingestion_source.type", "FILE")
267+
.put("ingestion_source.pointer.init.reset", "earliest")
268+
.put("ingestion_source.param.stream", stream)
269+
.put("ingestion_source.param.base_directory", ingestionDir.toString())
270+
.put("index.replication.type", "SEGMENT")
271+
.build(),
272+
mappings
273+
);
274+
ensureGreen(index);
275+
276+
// Lag should be 0 since there are no messages
277+
waitForState(() -> {
278+
PollingIngestStats stats = getPollingIngestStats(index);
279+
return stats != null && stats.getConsumerStats().pointerBasedLag() == 0L;
280+
});
281+
282+
// Add messages to the file
283+
try (
284+
BufferedWriter writer = Files.newBufferedWriter(
285+
shardFile,
286+
StandardCharsets.UTF_8,
287+
StandardOpenOption.WRITE,
288+
StandardOpenOption.TRUNCATE_EXISTING
289+
)
290+
) {
291+
writer.write("{\"_id\":\"1\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"alice\", \"age\": 30}}\n");
292+
writer.write("{\"_id\":\"2\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"bob\", \"age\": 35}}\n");
293+
writer.flush();
294+
}
295+
296+
try (FileChannel channel = FileChannel.open(shardFile, StandardOpenOption.READ)) {
297+
channel.force(true);
298+
}
299+
300+
// Wait for messages to be processed
301+
waitForState(() -> {
302+
SearchResponse response = client().prepareSearch(index).setQuery(new RangeQueryBuilder("age").gte(0)).get();
303+
return response.getHits().getTotalHits().value() == 2;
304+
});
305+
306+
// Lag should be 0 after all messages are consumed
307+
waitForState(() -> {
308+
PollingIngestStats stats = getPollingIngestStats(index);
309+
return stats != null && stats.getConsumerStats().pointerBasedLag() == 0L;
310+
});
311+
312+
// cleanup
313+
client().admin().indices().delete(new DeleteIndexRequest(index)).actionGet();
314+
}
315+
316+
public void testPointerBasedLagAfterPause() throws Exception {
317+
String mappings = """
318+
{
319+
"properties": {
320+
"name": { "type": "text" },
321+
"age": { "type": "integer" }
322+
}
323+
}
324+
""";
325+
326+
createIndexWithMappingSource(
327+
index,
328+
Settings.builder()
329+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
330+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
331+
.put("ingestion_source.type", "FILE")
332+
.put("ingestion_source.pointer.init.reset", "earliest")
333+
.put("ingestion_source.param.stream", stream)
334+
.put("ingestion_source.param.base_directory", ingestionDir.toString())
335+
.put("index.replication.type", "SEGMENT")
336+
.build(),
337+
mappings
338+
);
339+
ensureGreen(index);
340+
341+
// Wait for initial messages to be processed
342+
waitForState(() -> {
343+
SearchResponse response = client().prepareSearch(index).setQuery(new RangeQueryBuilder("age").gte(0)).get();
344+
return response.getHits().getTotalHits().value() == 2;
345+
});
346+
347+
// Pause ingestion
348+
PauseIngestionResponse pauseResponse = client().admin().indices().pauseIngestion(Requests.pauseIngestionRequest(index)).get();
349+
assertTrue(pauseResponse.isAcknowledged());
350+
assertTrue(pauseResponse.isShardsAcknowledged());
351+
352+
// Wait for pause to take effect
353+
waitForState(() -> {
354+
GetIngestionStateResponse ingestionState = client().admin()
355+
.indices()
356+
.getIngestionState(Requests.getIngestionStateRequest(index))
357+
.get();
358+
return ingestionState.getFailedShards() == 0
359+
&& Arrays.stream(ingestionState.getShardStates())
360+
.allMatch(state -> state.isPollerPaused() && state.getPollerState().equalsIgnoreCase("paused"));
361+
});
362+
363+
// Add more messages to the file while paused
364+
Path streamDir = ingestionDir.resolve(stream);
365+
Path shardFile = streamDir.resolve("0.ndjson");
366+
try (BufferedWriter writer = Files.newBufferedWriter(shardFile, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
367+
writer.write("{\"_id\":\"3\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"charlie\", \"age\": 40}}\n");
368+
writer.write("{\"_id\":\"4\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"diana\", \"age\": 45}}\n");
369+
writer.write("{\"_id\":\"5\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"eve\", \"age\": 50}}\n");
370+
writer.flush();
371+
}
372+
373+
try (FileChannel channel = FileChannel.open(shardFile, StandardOpenOption.READ)) {
374+
channel.force(true);
375+
}
376+
377+
// Wait for lag to be calculated (lag is updated every 10 seconds)
378+
waitForState(() -> {
379+
PollingIngestStats stats = getPollingIngestStats(index);
380+
return stats != null && stats.getConsumerStats().pointerBasedLag() == 3L;
381+
});
382+
383+
// cleanup
384+
client().admin().indices().delete(new DeleteIndexRequest(index)).actionGet();
385+
}
386+
387+
/**
388+
* Helper method to get polling ingest stats for the index
389+
*/
390+
private PollingIngestStats getPollingIngestStats(String indexName) {
391+
IndexStats indexStats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName);
392+
ShardStats[] shards = indexStats.getShards();
393+
if (shards.length > 0) {
394+
return shards[0].getPollingIngestStats();
395+
}
396+
return null;
397+
}
398+
399+
private void waitForState(Callable<Boolean> checkState) throws Exception {
400+
assertBusy(() -> {
401+
if (checkState.call() == false) {
402+
fail("Provided state requirements not met");
403+
}
404+
}, 1, TimeUnit.MINUTES);
405+
}
240406
}

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,65 @@ public void testResetPollerInAllActiveIngestion() throws Exception {
567567
});
568568
}
569569

570+
public void testAllActiveOffsetBasedLag() throws Exception {
571+
// Create all-active pull-based index
572+
internalCluster().startClusterManagerOnlyNode();
573+
final String nodeA = internalCluster().startDataOnlyNode();
574+
final String nodeB = internalCluster().startDataOnlyNode();
575+
576+
createIndex(
577+
indexName,
578+
Settings.builder()
579+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
580+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
581+
.put("ingestion_source.type", "kafka")
582+
.put("ingestion_source.param.topic", topicName)
583+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
584+
.put("ingestion_source.pointer.init.reset", "earliest")
585+
.put("ingestion_source.all_active", true)
586+
.build(),
587+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
588+
);
589+
590+
ensureGreen(indexName);
591+
// no messages published, expect 0 lag
592+
assertTrue(validateOffsetBasedLagForPrimaryAndReplica(0));
593+
594+
// pause ingestion
595+
PauseIngestionResponse pauseResponse = pauseIngestion(indexName);
596+
assertTrue(pauseResponse.isAcknowledged());
597+
assertTrue(pauseResponse.isShardsAcknowledged());
598+
waitForState(() -> {
599+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
600+
return ingestionState.getShardStates().length == 2
601+
&& ingestionState.getFailedShards() == 0
602+
&& Arrays.stream(ingestionState.getShardStates())
603+
.allMatch(state -> state.isPollerPaused() && state.getPollerState().equalsIgnoreCase("paused"));
604+
});
605+
606+
// produce 10 messages in paused state and validate lag
607+
for (int i = 0; i < 10; i++) {
608+
produceData(Integer.toString(i), "name" + i, "30");
609+
}
610+
waitForState(() -> validateOffsetBasedLagForPrimaryAndReplica(10));
611+
612+
// resume ingestion
613+
ResumeIngestionResponse resumeResponse = resumeIngestion(indexName);
614+
assertTrue(resumeResponse.isAcknowledged());
615+
assertTrue(resumeResponse.isShardsAcknowledged());
616+
waitForState(() -> {
617+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
618+
return ingestionState.getShardStates().length == 2
619+
&& Arrays.stream(ingestionState.getShardStates())
620+
.allMatch(
621+
state -> state.isPollerPaused() == false
622+
&& (state.getPollerState().equalsIgnoreCase("polling") || state.getPollerState().equalsIgnoreCase("processing"))
623+
);
624+
});
625+
waitForSearchableDocs(10, List.of(nodeA, nodeB));
626+
waitForState(() -> validateOffsetBasedLagForPrimaryAndReplica(0));
627+
}
628+
570629
// returns PollingIngestStats for single primary and single replica
571630
private Map<String, PollingIngestStats> getPollingIngestStatsForPrimaryAndReplica(String indexName) {
572631
IndexStats indexStats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName);
@@ -583,4 +642,14 @@ private Map<String, PollingIngestStats> getPollingIngestStatsForPrimaryAndReplic
583642

584643
return shardTypeToStats;
585644
}
645+
646+
private boolean validateOffsetBasedLagForPrimaryAndReplica(long expectedLag) {
647+
boolean valid = true;
648+
Map<String, PollingIngestStats> shardTypeToStats = getPollingIngestStatsForPrimaryAndReplica(indexName);
649+
valid &= shardTypeToStats.get("primary") != null
650+
&& shardTypeToStats.get("primary").getConsumerStats().pointerBasedLag() == expectedLag;
651+
valid &= shardTypeToStats.get("replica") != null
652+
&& shardTypeToStats.get("replica").getConsumerStats().pointerBasedLag() == expectedLag;
653+
return valid;
654+
}
586655
}

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,34 @@ public int getShardId() {
255255
return topicPartition.partition();
256256
}
257257

258+
/**
259+
* Compute Kafka offset based lag as the difference between latest available offset and last consumed offset.
260+
* Note: This method is not thread-safe and should only be called from the poller thread to avoid multi-threaded
261+
* access to KafkaConsumer.
262+
*
263+
* @param expectedStartPointer the pointer where ingestion would start if no messages have been consumed yet
264+
* @return offset based lag
265+
*/
266+
@Override
267+
public long getPointerBasedLag(IngestionShardPointer expectedStartPointer) {
268+
try {
269+
// Get the end offset for the partition
270+
long endOffset = consumer.endOffsets(Collections.singletonList(topicPartition)).getOrDefault(topicPartition, 0L);
271+
272+
if (lastFetchedOffset < 0) {
273+
// Haven't fetched anything yet, use the expected start pointer
274+
long startOffset = ((KafkaOffset) expectedStartPointer).getOffset();
275+
return endOffset - startOffset;
276+
}
277+
278+
// Calculate lag as difference between latest and last consumed offset
279+
return endOffset - lastFetchedOffset - 1;
280+
} catch (Exception e) {
281+
logger.warn("Failed to calculate pointer based lag for partition {}: {}", topicPartition.partition(), e.getMessage());
282+
return 0;
283+
}
284+
}
285+
258286
@Override
259287
public void close() throws IOException {
260288
consumer.close();

0 commit comments

Comments
 (0)