Skip to content

Commit c9a5fab

Browse files
[Pull-based Ingestion] Add Kafka offset based consumer lag in pull-based ingestion (#19635)
* include pointer based lag metric for pull-based ingestion * set -1 kafka lag if errors occur during offset lag computation * add setting for pointer based lag interval * update the lag interval setting to timeValue type --------- Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
1 parent 7042e77 commit c9a5fab

File tree

20 files changed

+519
-15
lines changed

20 files changed

+519
-15
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1313
- Return full error for GRPC error response ([#19568](https://github.com/opensearch-project/OpenSearch/pull/19568))
1414
- Add pluggable gRPC interceptors with explicit ordering([#19005](https://github.com/opensearch-project/OpenSearch/pull/19005))
1515
- Add metrics for the merged segment warmer feature ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929))
16+
- Add pointer based lag metric in pull-based ingestion ([#19635](https://github.com/opensearch-project/OpenSearch/pull/19635))
1617

1718
### Changed
1819
- Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350))

plugins/ingestion-fs/src/main/java/org/opensearch/plugin/ingestion/fs/FilePartitionConsumer.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ public FilePartitionConsumer(FileSourceConfig config, int shardId) {
6262
public List<ReadResult<FileOffset, FileMessage>> readNext(FileOffset offset, boolean includeStart, long maxMessages, int timeoutMillis)
6363
throws TimeoutException {
6464
long startLine = includeStart ? offset.getLine() : offset.getLine() + 1;
65-
lastReadLine = startLine;
6665
return readFromFile(startLine, maxMessages);
6766
}
6867

@@ -155,6 +154,22 @@ public int getShardId() {
155154
return shardId;
156155
}
157156

157+
@Override
158+
public long getPointerBasedLag(IngestionShardPointer expectedStartPointer) {
159+
if (!shardFile.exists()) {
160+
return 0;
161+
}
162+
163+
FileOffset latestOffset = (FileOffset) latestPointer();
164+
if (lastReadLine < 0) {
165+
// Haven't read anything yet, use the expected start pointer
166+
long startLine = ((FileOffset) expectedStartPointer).getLine();
167+
return Math.max(0, latestOffset.getLine() - startLine);
168+
}
169+
// return lag as number of remaining lines from lastReadLineNumber
170+
return latestOffset.getLine() - lastReadLine - 1;
171+
}
172+
158173
@Override
159174
public void close() throws IOException {
160175
if (reader != null) {

plugins/ingestion-fs/src/test/java/org/opensearch/plugin/ingestion/fs/FileBasedIngestionSingleNodeTests.java

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
package org.opensearch.plugin.ingestion.fs;
1010

1111
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
12+
import org.opensearch.action.admin.indices.stats.IndexStats;
13+
import org.opensearch.action.admin.indices.stats.ShardStats;
1214
import org.opensearch.action.admin.indices.streamingingestion.pause.PauseIngestionResponse;
1315
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionRequest;
1416
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionResponse;
@@ -17,6 +19,7 @@
1719
import org.opensearch.cluster.metadata.IndexMetadata;
1820
import org.opensearch.common.settings.Settings;
1921
import org.opensearch.index.query.RangeQueryBuilder;
22+
import org.opensearch.indices.pollingingest.PollingIngestStats;
2023
import org.opensearch.plugins.Plugin;
2124
import org.opensearch.test.OpenSearchSingleNodeTestCase;
2225
import org.opensearch.transport.client.Requests;
@@ -31,6 +34,8 @@
3134
import java.util.Arrays;
3235
import java.util.Collection;
3336
import java.util.Collections;
37+
import java.util.concurrent.Callable;
38+
import java.util.concurrent.TimeUnit;
3439

3540
public class FileBasedIngestionSingleNodeTests extends OpenSearchSingleNodeTestCase {
3641
private Path ingestionDir;
@@ -237,4 +242,167 @@ public void testFileIngestionFromProvidedPointer() throws Exception {
237242
// cleanup the test index
238243
client().admin().indices().delete(new DeleteIndexRequest(index)).actionGet();
239244
}
245+
246+
public void testPointerBasedLag() throws Exception {
247+
String mappings = """
248+
{
249+
"properties": {
250+
"name": { "type": "text" },
251+
"age": { "type": "integer" }
252+
}
253+
}
254+
""";
255+
256+
// Create index with empty file (no messages)
257+
Path streamDir = ingestionDir.resolve(stream);
258+
Path shardFile = streamDir.resolve("0.ndjson");
259+
Files.write(shardFile, new byte[0]); // Empty file
260+
261+
createIndexWithMappingSource(
262+
index,
263+
Settings.builder()
264+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
265+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
266+
.put("ingestion_source.type", "FILE")
267+
.put("ingestion_source.pointer.init.reset", "earliest")
268+
.put("ingestion_source.pointer_based_lag_update_interval", "3s")
269+
.put("ingestion_source.param.stream", stream)
270+
.put("ingestion_source.param.base_directory", ingestionDir.toString())
271+
.put("index.replication.type", "SEGMENT")
272+
.build(),
273+
mappings
274+
);
275+
ensureGreen(index);
276+
277+
// Lag should be 0 since there are no messages
278+
waitForState(() -> {
279+
PollingIngestStats stats = getPollingIngestStats(index);
280+
return stats != null && stats.getConsumerStats().pointerBasedLag() == 0L;
281+
});
282+
283+
// Add messages to the file
284+
try (
285+
BufferedWriter writer = Files.newBufferedWriter(
286+
shardFile,
287+
StandardCharsets.UTF_8,
288+
StandardOpenOption.WRITE,
289+
StandardOpenOption.TRUNCATE_EXISTING
290+
)
291+
) {
292+
writer.write("{\"_id\":\"1\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"alice\", \"age\": 30}}\n");
293+
writer.write("{\"_id\":\"2\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"bob\", \"age\": 35}}\n");
294+
writer.flush();
295+
}
296+
297+
try (FileChannel channel = FileChannel.open(shardFile, StandardOpenOption.READ)) {
298+
channel.force(true);
299+
}
300+
301+
// Wait for messages to be processed
302+
waitForState(() -> {
303+
SearchResponse response = client().prepareSearch(index).setQuery(new RangeQueryBuilder("age").gte(0)).get();
304+
return response.getHits().getTotalHits().value() == 2;
305+
});
306+
307+
// Lag should be 0 after all messages are consumed
308+
waitForState(() -> {
309+
PollingIngestStats stats = getPollingIngestStats(index);
310+
return stats != null && stats.getConsumerStats().pointerBasedLag() == 0L;
311+
});
312+
313+
// cleanup
314+
client().admin().indices().delete(new DeleteIndexRequest(index)).actionGet();
315+
}
316+
317+
public void testPointerBasedLagAfterPause() throws Exception {
318+
String mappings = """
319+
{
320+
"properties": {
321+
"name": { "type": "text" },
322+
"age": { "type": "integer" }
323+
}
324+
}
325+
""";
326+
327+
createIndexWithMappingSource(
328+
index,
329+
Settings.builder()
330+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
331+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
332+
.put("ingestion_source.type", "FILE")
333+
.put("ingestion_source.pointer.init.reset", "earliest")
334+
.put("ingestion_source.pointer_based_lag_update_interval", "3s")
335+
.put("ingestion_source.param.stream", stream)
336+
.put("ingestion_source.param.base_directory", ingestionDir.toString())
337+
.put("index.replication.type", "SEGMENT")
338+
.build(),
339+
mappings
340+
);
341+
ensureGreen(index);
342+
343+
// Wait for initial messages to be processed
344+
waitForState(() -> {
345+
SearchResponse response = client().prepareSearch(index).setQuery(new RangeQueryBuilder("age").gte(0)).get();
346+
return response.getHits().getTotalHits().value() == 2;
347+
});
348+
349+
// Pause ingestion
350+
PauseIngestionResponse pauseResponse = client().admin().indices().pauseIngestion(Requests.pauseIngestionRequest(index)).get();
351+
assertTrue(pauseResponse.isAcknowledged());
352+
assertTrue(pauseResponse.isShardsAcknowledged());
353+
354+
// Wait for pause to take effect
355+
waitForState(() -> {
356+
GetIngestionStateResponse ingestionState = client().admin()
357+
.indices()
358+
.getIngestionState(Requests.getIngestionStateRequest(index))
359+
.get();
360+
return ingestionState.getFailedShards() == 0
361+
&& Arrays.stream(ingestionState.getShardStates())
362+
.allMatch(state -> state.isPollerPaused() && state.getPollerState().equalsIgnoreCase("paused"));
363+
});
364+
365+
// Add more messages to the file while paused
366+
Path streamDir = ingestionDir.resolve(stream);
367+
Path shardFile = streamDir.resolve("0.ndjson");
368+
try (BufferedWriter writer = Files.newBufferedWriter(shardFile, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
369+
writer.write("{\"_id\":\"3\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"charlie\", \"age\": 40}}\n");
370+
writer.write("{\"_id\":\"4\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"diana\", \"age\": 45}}\n");
371+
writer.write("{\"_id\":\"5\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"eve\", \"age\": 50}}\n");
372+
writer.flush();
373+
}
374+
375+
try (FileChannel channel = FileChannel.open(shardFile, StandardOpenOption.READ)) {
376+
channel.force(true);
377+
}
378+
379+
// Wait for lag to be calculated (lag is updated every 3 seconds in this test)
380+
waitForState(() -> {
381+
PollingIngestStats stats = getPollingIngestStats(index);
382+
return stats != null && stats.getConsumerStats().pointerBasedLag() == 3L;
383+
});
384+
385+
// cleanup
386+
client().admin().indices().delete(new DeleteIndexRequest(index)).actionGet();
387+
}
388+
389+
/**
390+
* Helper method to get polling ingest stats for the index
391+
*/
392+
private PollingIngestStats getPollingIngestStats(String indexName) {
393+
IndexStats indexStats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName);
394+
ShardStats[] shards = indexStats.getShards();
395+
if (shards.length > 0) {
396+
return shards[0].getPollingIngestStats();
397+
}
398+
return null;
399+
}
400+
401+
private void waitForState(Callable<Boolean> checkState) throws Exception {
402+
assertBusy(() -> {
403+
if (checkState.call() == false) {
404+
fail("Provided state requirements not met");
405+
}
406+
}, 1, TimeUnit.MINUTES);
407+
}
240408
}

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,66 @@ public void testResetPollerInAllActiveIngestion() throws Exception {
567567
});
568568
}
569569

570+
public void testAllActiveOffsetBasedLag() throws Exception {
571+
// Create all-active pull-based index
572+
internalCluster().startClusterManagerOnlyNode();
573+
final String nodeA = internalCluster().startDataOnlyNode();
574+
final String nodeB = internalCluster().startDataOnlyNode();
575+
576+
createIndex(
577+
indexName,
578+
Settings.builder()
579+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
580+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
581+
.put("ingestion_source.type", "kafka")
582+
.put("ingestion_source.param.topic", topicName)
583+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
584+
.put("ingestion_source.pointer.init.reset", "earliest")
585+
.put("ingestion_source.pointer_based_lag_update_interval", "3s")
586+
.put("ingestion_source.all_active", true)
587+
.build(),
588+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
589+
);
590+
591+
ensureGreen(indexName);
592+
// no messages published, expect 0 lag
593+
assertTrue(validateOffsetBasedLagForPrimaryAndReplica(0));
594+
595+
// pause ingestion
596+
PauseIngestionResponse pauseResponse = pauseIngestion(indexName);
597+
assertTrue(pauseResponse.isAcknowledged());
598+
assertTrue(pauseResponse.isShardsAcknowledged());
599+
waitForState(() -> {
600+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
601+
return ingestionState.getShardStates().length == 2
602+
&& ingestionState.getFailedShards() == 0
603+
&& Arrays.stream(ingestionState.getShardStates())
604+
.allMatch(state -> state.isPollerPaused() && state.getPollerState().equalsIgnoreCase("paused"));
605+
});
606+
607+
// produce 10 messages in paused state and validate lag
608+
for (int i = 0; i < 10; i++) {
609+
produceData(Integer.toString(i), "name" + i, "30");
610+
}
611+
waitForState(() -> validateOffsetBasedLagForPrimaryAndReplica(10));
612+
613+
// resume ingestion
614+
ResumeIngestionResponse resumeResponse = resumeIngestion(indexName);
615+
assertTrue(resumeResponse.isAcknowledged());
616+
assertTrue(resumeResponse.isShardsAcknowledged());
617+
waitForState(() -> {
618+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
619+
return ingestionState.getShardStates().length == 2
620+
&& Arrays.stream(ingestionState.getShardStates())
621+
.allMatch(
622+
state -> state.isPollerPaused() == false
623+
&& (state.getPollerState().equalsIgnoreCase("polling") || state.getPollerState().equalsIgnoreCase("processing"))
624+
);
625+
});
626+
waitForSearchableDocs(10, List.of(nodeA, nodeB));
627+
waitForState(() -> validateOffsetBasedLagForPrimaryAndReplica(0));
628+
}
629+
570630
// returns PollingIngestStats for single primary and single replica
571631
private Map<String, PollingIngestStats> getPollingIngestStatsForPrimaryAndReplica(String indexName) {
572632
IndexStats indexStats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName);
@@ -583,4 +643,14 @@ private Map<String, PollingIngestStats> getPollingIngestStatsForPrimaryAndReplic
583643

584644
return shardTypeToStats;
585645
}
646+
647+
private boolean validateOffsetBasedLagForPrimaryAndReplica(long expectedLag) {
648+
boolean valid = true;
649+
Map<String, PollingIngestStats> shardTypeToStats = getPollingIngestStatsForPrimaryAndReplica(indexName);
650+
valid &= shardTypeToStats.get("primary") != null
651+
&& shardTypeToStats.get("primary").getConsumerStats().pointerBasedLag() == expectedLag;
652+
valid &= shardTypeToStats.get("replica") != null
653+
&& shardTypeToStats.get("replica").getConsumerStats().pointerBasedLag() == expectedLag;
654+
return valid;
655+
}
586656
}

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ protected static Consumer<byte[], byte[]> createConsumer(String clientId, KafkaS
138138
* @param includeStart whether to include the start pointer in the read
139139
* @param maxMessages this setting is not honored for Kafka at this stage. maxMessages is instead set at consumer initialization.
140140
* @param timeoutMillis the maximum time to wait for messages
141-
* @return
141+
* @return the next read result
142142
* @throws TimeoutException
143143
*/
144144
@Override
@@ -158,7 +158,7 @@ public List<ReadResult<KafkaOffset, KafkaMessage>> readNext(
158158
* Read the next batch of messages from Kafka.
159159
* @param maxMessages this setting is not honored for Kafka at this stage. maxMessages is instead set at consumer initialization.
160160
* @param timeoutMillis the maximum time to wait for messages
161-
* @return
161+
* @return the next read result
162162
* @throws TimeoutException
163163
*/
164164
@Override
@@ -255,6 +255,35 @@ public int getShardId() {
255255
return topicPartition.partition();
256256
}
257257

258+
/**
259+
* Compute Kafka offset based lag as the difference between latest available offset and last consumed offset.
260+
* Note: This method is not thread-safe and should only be called from the poller thread to avoid multi-threaded
261+
* access to KafkaConsumer.
262+
*
263+
* @param expectedStartPointer the pointer where ingestion would start if no messages have been consumed yet
264+
* @return offset based lag. -1 is returned if errors are encountered.
265+
*/
266+
@Override
267+
public long getPointerBasedLag(IngestionShardPointer expectedStartPointer) {
268+
try {
269+
// Get the end offset for the partition
270+
long endOffset = consumer.endOffsets(Collections.singletonList(topicPartition)).getOrDefault(topicPartition, 0L);
271+
272+
if (lastFetchedOffset < 0) {
273+
// Haven't fetched anything yet, use the expected start pointer.
274+
// Set lag as 0 in case expectedStartPointer is beyond endOffset.
275+
long startOffset = ((KafkaOffset) expectedStartPointer).getOffset();
276+
return Math.max(0, endOffset - startOffset);
277+
}
278+
279+
// Calculate lag as difference between latest and last consumed offset
280+
return endOffset - lastFetchedOffset - 1;
281+
} catch (Exception e) {
282+
logger.warn("Failed to calculate pointer based lag for partition {}: {}", topicPartition.partition(), e.getMessage());
283+
return -1;
284+
}
285+
}
286+
258287
@Override
259288
public void close() throws IOException {
260289
consumer.close();

0 commit comments

Comments
 (0)