Skip to content

Commit c041929

Browse files
add setting for pointer based lag interval
Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
1 parent 85c04fc commit c041929

File tree

10 files changed

+84
-10
lines changed

10 files changed

+84
-10
lines changed

plugins/ingestion-fs/src/test/java/org/opensearch/plugin/ingestion/fs/FileBasedIngestionSingleNodeTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ public void testPointerBasedLag() throws Exception {
265265
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
266266
.put("ingestion_source.type", "FILE")
267267
.put("ingestion_source.pointer.init.reset", "earliest")
268+
.put("ingestion_source.pointer_based_lag_update_interval", 3000)
268269
.put("ingestion_source.param.stream", stream)
269270
.put("ingestion_source.param.base_directory", ingestionDir.toString())
270271
.put("index.replication.type", "SEGMENT")
@@ -330,6 +331,7 @@ public void testPointerBasedLagAfterPause() throws Exception {
330331
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
331332
.put("ingestion_source.type", "FILE")
332333
.put("ingestion_source.pointer.init.reset", "earliest")
334+
.put("ingestion_source.pointer_based_lag_update_interval", 3000)
333335
.put("ingestion_source.param.stream", stream)
334336
.put("ingestion_source.param.base_directory", ingestionDir.toString())
335337
.put("index.replication.type", "SEGMENT")
@@ -374,7 +376,7 @@ public void testPointerBasedLagAfterPause() throws Exception {
374376
channel.force(true);
375377
}
376378

377-
// Wait for lag to be calculated (lag is updated every 10 seconds)
379+
// Wait for lag to be calculated (lag is updated every 3 seconds in this test)
378380
waitForState(() -> {
379381
PollingIngestStats stats = getPollingIngestStats(index);
380382
return stats != null && stats.getConsumerStats().pointerBasedLag() == 3L;

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,7 @@ public void testAllActiveOffsetBasedLag() throws Exception {
582582
.put("ingestion_source.param.topic", topicName)
583583
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
584584
.put("ingestion_source.pointer.init.reset", "earliest")
585+
.put("ingestion_source.pointer_based_lag_update_interval", 3000)
585586
.put("ingestion_source.all_active", true)
586587
.build(),
587588
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"

plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaSingleNodeTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public void testPauseAndResumeAPIs() throws Exception {
8080
.put("ingestion_source.param.topic", topicName)
8181
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
8282
.put("index.replication.type", "SEGMENT")
83+
.put("ingestion_source.pointer_based_lag_update_interval", 0)
8384
.build(),
8485
mappings
8586
);

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -907,6 +907,21 @@ public Iterator<Setting<?>> settings() {
907907
Setting.Property.Final
908908
);
909909

910+
/**
911+
* Defines the pointer-based lag update interval in milliseconds for pull-based ingestion.
912+
* This controls how frequently the lag between the latest available message and the last consumed message is calculated.
913+
* Setting this to 0 disables pointer-based lag calculation entirely.
914+
*/
915+
public static final String SETTING_INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL =
916+
"index.ingestion_source.pointer_based_lag_update_interval";
917+
public static final Setting<Integer> INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING = Setting.intSetting(
918+
SETTING_INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL,
919+
10000,
920+
0,
921+
Property.IndexScope,
922+
Property.Final
923+
);
924+
910925
/**
911926
* Defines if all-active pull-based ingestion is enabled. In this mode, replicas will directly consume from the
912927
* streaming source and process the updates. In the default document replication mode, this setting must be enabled.
@@ -1210,6 +1225,7 @@ public IngestionSource getIngestionSource() {
12101225
final int numProcessorThreads = INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING.get(settings);
12111226
final int blockingQueueSize = INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING.get(settings);
12121227
final boolean allActiveIngestionEnabled = INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING.get(settings);
1228+
final int pointerBasedLagUpdateInterval = INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING.get(settings);
12131229

12141230
return new IngestionSource.Builder(ingestionSourceType).setParams(ingestionSourceParams)
12151231
.setPointerInitReset(pointerInitReset)
@@ -1219,6 +1235,7 @@ public IngestionSource getIngestionSource() {
12191235
.setNumProcessorThreads(numProcessorThreads)
12201236
.setBlockingQueueSize(blockingQueueSize)
12211237
.setAllActiveIngestion(allActiveIngestionEnabled)
1238+
.setPointerBasedLagUpdateInterval(pointerBasedLagUpdateInterval)
12221239
.build();
12231240
}
12241241
return null;

server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING;
2222
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_MAX_POLL_SIZE;
2323
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING;
24+
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING;
2425
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_POLL_TIMEOUT;
2526

2627
/**
@@ -37,6 +38,7 @@ public class IngestionSource {
3738
private int numProcessorThreads;
3839
private int blockingQueueSize;
3940
private final boolean allActiveIngestion;
41+
private final int pointerBasedLagUpdateInterval;
4042

4143
private IngestionSource(
4244
String type,
@@ -47,7 +49,8 @@ private IngestionSource(
4749
int pollTimeout,
4850
int numProcessorThreads,
4951
int blockingQueueSize,
50-
boolean allActiveIngestion
52+
boolean allActiveIngestion,
53+
int pointerBasedLagUpdateInterval
5154
) {
5255
this.type = type;
5356
this.pointerInitReset = pointerInitReset;
@@ -58,6 +61,7 @@ private IngestionSource(
5861
this.numProcessorThreads = numProcessorThreads;
5962
this.blockingQueueSize = blockingQueueSize;
6063
this.allActiveIngestion = allActiveIngestion;
64+
this.pointerBasedLagUpdateInterval = pointerBasedLagUpdateInterval;
6165
}
6266

6367
public String getType() {
@@ -96,6 +100,10 @@ public boolean isAllActiveIngestionEnabled() {
96100
return allActiveIngestion;
97101
}
98102

103+
public int getPointerBasedLagUpdateInterval() {
104+
return pointerBasedLagUpdateInterval;
105+
}
106+
99107
@Override
100108
public boolean equals(Object o) {
101109
if (this == o) return true;
@@ -109,7 +117,8 @@ public boolean equals(Object o) {
109117
&& Objects.equals(pollTimeout, ingestionSource.pollTimeout)
110118
&& Objects.equals(numProcessorThreads, ingestionSource.numProcessorThreads)
111119
&& Objects.equals(blockingQueueSize, ingestionSource.blockingQueueSize)
112-
&& Objects.equals(allActiveIngestion, ingestionSource.allActiveIngestion);
120+
&& Objects.equals(allActiveIngestion, ingestionSource.allActiveIngestion)
121+
&& Objects.equals(pointerBasedLagUpdateInterval, ingestionSource.pointerBasedLagUpdateInterval);
113122
}
114123

115124
@Override
@@ -123,7 +132,8 @@ public int hashCode() {
123132
pollTimeout,
124133
numProcessorThreads,
125134
blockingQueueSize,
126-
allActiveIngestion
135+
allActiveIngestion,
136+
pointerBasedLagUpdateInterval
127137
);
128138
}
129139

@@ -151,6 +161,8 @@ public String toString() {
151161
+ blockingQueueSize
152162
+ ", allActiveIngestion="
153163
+ allActiveIngestion
164+
+ ", pointerBasedLagUpdateInterval="
165+
+ pointerBasedLagUpdateInterval
154166
+ '}';
155167
}
156168

@@ -209,6 +221,7 @@ public static class Builder {
209221
private int numProcessorThreads = INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING.getDefault(Settings.EMPTY);
210222
private int blockingQueueSize = INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING.getDefault(Settings.EMPTY);
211223
private boolean allActiveIngestion = INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING.getDefault(Settings.EMPTY);
224+
private int pointerBasedLagUpdateInterval = INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING.getDefault(Settings.EMPTY);
212225

213226
public Builder(String type) {
214227
this.type = type;
@@ -222,6 +235,7 @@ public Builder(IngestionSource ingestionSource) {
222235
this.params = ingestionSource.params;
223236
this.blockingQueueSize = ingestionSource.blockingQueueSize;
224237
this.allActiveIngestion = ingestionSource.allActiveIngestion;
238+
this.pointerBasedLagUpdateInterval = ingestionSource.pointerBasedLagUpdateInterval;
225239
}
226240

227241
public Builder setPointerInitReset(PointerInitReset pointerInitReset) {
@@ -269,6 +283,11 @@ public Builder setAllActiveIngestion(boolean allActiveIngestion) {
269283
return this;
270284
}
271285

286+
public Builder setPointerBasedLagUpdateInterval(int pointerBasedLagUpdateInterval) {
287+
this.pointerBasedLagUpdateInterval = pointerBasedLagUpdateInterval;
288+
return this;
289+
}
290+
272291
public IngestionSource build() {
273292
return new IngestionSource(
274293
type,
@@ -279,7 +298,8 @@ public IngestionSource build() {
279298
pollTimeout,
280299
numProcessorThreads,
281300
blockingQueueSize,
282-
allActiveIngestion
301+
allActiveIngestion,
302+
pointerBasedLagUpdateInterval
283303
);
284304
}
285305

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
281281
IndexMetadata.INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING,
282282
IndexMetadata.INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING,
283283
IndexMetadata.INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING,
284+
IndexMetadata.INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING,
284285

285286
// Settings for search replica
286287
IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING,

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ private void initializeStreamPoller(
145145
.pollTimeout(ingestionSource.getPollTimeout())
146146
.numProcessorThreads(ingestionSource.getNumProcessorThreads())
147147
.blockingQueueSize(ingestionSource.getBlockingQueueSize())
148+
.pointerBasedLagUpdateInterval(ingestionSource.getPointerBasedLagUpdateInterval())
148149
.build();
149150
registerStreamPollerListener();
150151

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ public class DefaultStreamPoller implements StreamPoller {
3636
private static final Logger logger = LogManager.getLogger(DefaultStreamPoller.class);
3737
private static final int DEFAULT_POLLER_SLEEP_PERIOD_MS = 100;
3838
private static final int CONSUMER_INIT_RETRY_INTERVAL_MS = 10000;
39-
private static final int POINTER_BASED_LAG_UPDATE_INTERVAL_MS = 10000; // Update pointer based lag every 10 seconds
4039

4140
private volatile State state = State.NONE;
4241

@@ -70,6 +69,7 @@ public class DefaultStreamPoller implements StreamPoller {
7069

7170
private long maxPollSize;
7271
private int pollTimeout;
72+
private int pointerBasedLagUpdateIntervalMs;
7373

7474
private final String indexName;
7575

@@ -94,7 +94,8 @@ private DefaultStreamPoller(
9494
long maxPollSize,
9595
int pollTimeout,
9696
int numProcessorThreads,
97-
int blockingQueueSize
97+
int blockingQueueSize,
98+
int pointerBasedLagUpdateInterval
9899
) {
99100
this(
100101
startPointer,
@@ -108,6 +109,7 @@ private DefaultStreamPoller(
108109
initialState,
109110
maxPollSize,
110111
pollTimeout,
112+
pointerBasedLagUpdateInterval,
111113
ingestionEngine.config().getIndexSettings()
112114
);
113115
}
@@ -127,6 +129,7 @@ private DefaultStreamPoller(
127129
State initialState,
128130
long maxPollSize,
129131
int pollTimeout,
132+
int pointerBasedLagUpdateInterval,
130133
IndexSettings indexSettings
131134
) {
132135
this.consumerFactory = Objects.requireNonNull(consumerFactory);
@@ -138,6 +141,7 @@ private DefaultStreamPoller(
138141
this.state = initialState;
139142
this.maxPollSize = maxPollSize;
140143
this.pollTimeout = pollTimeout;
144+
this.pointerBasedLagUpdateIntervalMs = pointerBasedLagUpdateInterval;
141145
this.blockingQueueContainer = blockingQueueContainer;
142146
this.consumerThread = Executors.newSingleThreadExecutor(
143147
r -> new Thread(r, String.format(Locale.ROOT, "stream-poller-consumer-%d-%d", shardId, System.currentTimeMillis()))
@@ -394,10 +398,16 @@ private void setLastPolledMessageTimestamp(long timestamp) {
394398
/**
395399
* Update the cached pointer-based lag if enough time has elapsed since the last update.
396400
* {@code consumer.getPointerBasedLag()} is called from the poller thread, so it's safe to access the consumer.
401+
* If pointerBasedLagUpdateIntervalMs is 0, pointer-based lag calculation is disabled.
397402
*/
398403
private void updatePointerBasedLagIfNeeded() {
404+
// If interval is 0, pointer-based lag is disabled
405+
if (pointerBasedLagUpdateIntervalMs == 0) {
406+
return;
407+
}
408+
399409
long currentTime = System.currentTimeMillis();
400-
if (consumer != null && (currentTime - lastPointerBasedLagUpdateTime >= POINTER_BASED_LAG_UPDATE_INTERVAL_MS)) {
410+
if (consumer != null && (currentTime - lastPointerBasedLagUpdateTime >= pointerBasedLagUpdateIntervalMs)) {
401411
try {
402412
// update the lastPointerBasedLagUpdateTime first, to avoid load on streaming source in case of errors
403413
lastPointerBasedLagUpdateTime = currentTime;
@@ -522,6 +532,7 @@ public static class Builder {
522532
private int pollTimeout = 1000;
523533
private int numProcessorThreads = 1;
524534
private int blockingQueueSize = 100;
535+
private int pointerBasedLagUpdateInterval = 10000;
525536

526537
/**
527538
* Initialize the builder with mandatory parameters
@@ -605,6 +616,14 @@ public Builder blockingQueueSize(int blockingQueueSize) {
605616
return this;
606617
}
607618

619+
/**
620+
* Set pointer-based lag update interval
621+
*/
622+
public Builder pointerBasedLagUpdateInterval(int pointerBasedLagUpdateInterval) {
623+
this.pointerBasedLagUpdateInterval = pointerBasedLagUpdateInterval;
624+
return this;
625+
}
626+
608627
/**
609628
* Build the DefaultStreamPoller instance
610629
*/
@@ -622,7 +641,8 @@ public DefaultStreamPoller build() {
622641
maxPollSize,
623642
pollTimeout,
624643
numProcessorThreads,
625-
blockingQueueSize
644+
blockingQueueSize,
645+
pointerBasedLagUpdateInterval
626646
);
627647
}
628648
}

server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public void testConstructorAndGetters() {
3030
.setPointerInitReset(pointerInitReset)
3131
.setErrorStrategy(DROP)
3232
.setBlockingQueueSize(1000)
33+
.setPointerBasedLagUpdateInterval(1000)
3334
.build();
3435

3536
assertEquals("type", source.getType());
@@ -40,6 +41,7 @@ public void testConstructorAndGetters() {
4041
assertEquals(1000, source.getMaxPollSize());
4142
assertEquals(1000, source.getPollTimeout());
4243
assertEquals(1000, source.getBlockingQueueSize());
44+
assertEquals(1000, source.getPointerBasedLagUpdateInterval());
4345
}
4446

4547
public void testEquals() {
@@ -105,7 +107,7 @@ public void testToString() {
105107
.setErrorStrategy(DROP)
106108
.build();
107109
String expected =
108-
"IngestionSource{type='type',pointer_init_reset='PointerInitReset{type='RESET_BY_OFFSET', value=1000}',error_strategy='DROP', params={key=value}, maxPollSize=1000, pollTimeout=1000, numProcessorThreads=1, blockingQueueSize=100, allActiveIngestion=false}";
110+
"IngestionSource{type='type',pointer_init_reset='PointerInitReset{type='RESET_BY_OFFSET', value=1000}',error_strategy='DROP', params={key=value}, maxPollSize=1000, pollTimeout=1000, numProcessorThreads=1, blockingQueueSize=100, allActiveIngestion=false, pointerBasedLagUpdateInterval=10000}";
109111
assertEquals(expected, source.toString());
110112
}
111113

0 commit comments

Comments
 (0)