Skip to content

Commit 3180493

Browse files
update the lag interval setting to timeValue type
Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
1 parent c041929 commit 3180493

File tree

8 files changed

+33
-28
lines changed

8 files changed

+33
-28
lines changed

plugins/ingestion-fs/src/test/java/org/opensearch/plugin/ingestion/fs/FileBasedIngestionSingleNodeTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ public void testPointerBasedLag() throws Exception {
265265
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
266266
.put("ingestion_source.type", "FILE")
267267
.put("ingestion_source.pointer.init.reset", "earliest")
268-
.put("ingestion_source.pointer_based_lag_update_interval", 3000)
268+
.put("ingestion_source.pointer_based_lag_update_interval", "3s")
269269
.put("ingestion_source.param.stream", stream)
270270
.put("ingestion_source.param.base_directory", ingestionDir.toString())
271271
.put("index.replication.type", "SEGMENT")
@@ -331,7 +331,7 @@ public void testPointerBasedLagAfterPause() throws Exception {
331331
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
332332
.put("ingestion_source.type", "FILE")
333333
.put("ingestion_source.pointer.init.reset", "earliest")
334-
.put("ingestion_source.pointer_based_lag_update_interval", 3000)
334+
.put("ingestion_source.pointer_based_lag_update_interval", "3s")
335335
.put("ingestion_source.param.stream", stream)
336336
.put("ingestion_source.param.base_directory", ingestionDir.toString())
337337
.put("index.replication.type", "SEGMENT")

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,7 @@ public void testAllActiveOffsetBasedLag() throws Exception {
582582
.put("ingestion_source.param.topic", topicName)
583583
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
584584
.put("ingestion_source.pointer.init.reset", "earliest")
585-
.put("ingestion_source.pointer_based_lag_update_interval", 3000)
585+
.put("ingestion_source.pointer_based_lag_update_interval", "3s")
586586
.put("ingestion_source.all_active", true)
587587
.build(),
588588
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"

plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaSingleNodeTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public void testPauseAndResumeAPIs() throws Exception {
8080
.put("ingestion_source.param.topic", topicName)
8181
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
8282
.put("index.replication.type", "SEGMENT")
83-
.put("ingestion_source.pointer_based_lag_update_interval", 0)
83+
.put("ingestion_source.pointer_based_lag_update_interval", "0")
8484
.build(),
8585
mappings
8686
);

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.opensearch.common.settings.Setting;
5252
import org.opensearch.common.settings.Setting.Property;
5353
import org.opensearch.common.settings.Settings;
54+
import org.opensearch.common.unit.TimeValue;
5455
import org.opensearch.common.xcontent.XContentHelper;
5556
import org.opensearch.core.Assertions;
5657
import org.opensearch.core.common.Strings;
@@ -93,6 +94,7 @@
9394
import java.util.Objects;
9495
import java.util.Set;
9596
import java.util.TreeSet;
97+
import java.util.concurrent.TimeUnit;
9698
import java.util.function.Function;
9799

98100
import static org.opensearch.cluster.metadata.Metadata.CONTEXT_MODE_PARAM;
@@ -908,16 +910,15 @@ public Iterator<Setting<?>> settings() {
908910
);
909911

910912
/**
911-
* Defines the pointer-based lag update interval in milliseconds for pull-based ingestion.
913+
* Defines the pointer-based lag update interval for pull-based ingestion.
912914
* This controls how frequently the lag between the latest available message and the last consumed message is calculated.
913915
* Setting this to 0 disables pointer-based lag calculation entirely.
914916
*/
915917
public static final String SETTING_INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL =
916918
"index.ingestion_source.pointer_based_lag_update_interval";
917-
public static final Setting<Integer> INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING = Setting.intSetting(
919+
public static final Setting<TimeValue> INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING = Setting.positiveTimeSetting(
918920
SETTING_INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL,
919-
10000,
920-
0,
921+
new TimeValue(10, TimeUnit.SECONDS),
921922
Property.IndexScope,
922923
Property.Final
923924
);
@@ -1225,7 +1226,7 @@ public IngestionSource getIngestionSource() {
12251226
final int numProcessorThreads = INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING.get(settings);
12261227
final int blockingQueueSize = INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING.get(settings);
12271228
final boolean allActiveIngestionEnabled = INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING.get(settings);
1228-
final int pointerBasedLagUpdateInterval = INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING.get(settings);
1229+
final TimeValue pointerBasedLagUpdateInterval = INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING.get(settings);
12291230

12301231
return new IngestionSource.Builder(ingestionSourceType).setParams(ingestionSourceParams)
12311232
.setPointerInitReset(pointerInitReset)

server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.opensearch.common.annotation.ExperimentalApi;
1212
import org.opensearch.common.settings.Settings;
13+
import org.opensearch.common.unit.TimeValue;
1314
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
1415
import org.opensearch.indices.pollingingest.StreamPoller;
1516

@@ -38,7 +39,7 @@ public class IngestionSource {
3839
private int numProcessorThreads;
3940
private int blockingQueueSize;
4041
private final boolean allActiveIngestion;
41-
private final int pointerBasedLagUpdateInterval;
42+
private final TimeValue pointerBasedLagUpdateInterval;
4243

4344
private IngestionSource(
4445
String type,
@@ -50,7 +51,7 @@ private IngestionSource(
5051
int numProcessorThreads,
5152
int blockingQueueSize,
5253
boolean allActiveIngestion,
53-
int pointerBasedLagUpdateInterval
54+
TimeValue pointerBasedLagUpdateInterval
5455
) {
5556
this.type = type;
5657
this.pointerInitReset = pointerInitReset;
@@ -100,7 +101,7 @@ public boolean isAllActiveIngestionEnabled() {
100101
return allActiveIngestion;
101102
}
102103

103-
public int getPointerBasedLagUpdateInterval() {
104+
public TimeValue getPointerBasedLagUpdateInterval() {
104105
return pointerBasedLagUpdateInterval;
105106
}
106107

@@ -221,7 +222,9 @@ public static class Builder {
221222
private int numProcessorThreads = INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING.getDefault(Settings.EMPTY);
222223
private int blockingQueueSize = INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING.getDefault(Settings.EMPTY);
223224
private boolean allActiveIngestion = INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING.getDefault(Settings.EMPTY);
224-
private int pointerBasedLagUpdateInterval = INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING.getDefault(Settings.EMPTY);
225+
private TimeValue pointerBasedLagUpdateInterval = INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING.getDefault(
226+
Settings.EMPTY
227+
);
225228

226229
public Builder(String type) {
227230
this.type = type;
@@ -283,7 +286,7 @@ public Builder setAllActiveIngestion(boolean allActiveIngestion) {
283286
return this;
284287
}
285288

286-
public Builder setPointerBasedLagUpdateInterval(int pointerBasedLagUpdateInterval) {
289+
public Builder setPointerBasedLagUpdateInterval(TimeValue pointerBasedLagUpdateInterval) {
287290
this.pointerBasedLagUpdateInterval = pointerBasedLagUpdateInterval;
288291
return this;
289292
}

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ private void initializeStreamPoller(
145145
.pollTimeout(ingestionSource.getPollTimeout())
146146
.numProcessorThreads(ingestionSource.getNumProcessorThreads())
147147
.blockingQueueSize(ingestionSource.getBlockingQueueSize())
148-
.pointerBasedLagUpdateInterval(ingestionSource.getPointerBasedLagUpdateInterval())
148+
.pointerBasedLagUpdateInterval(ingestionSource.getPointerBasedLagUpdateInterval().millis())
149149
.build();
150150
registerStreamPollerListener();
151151

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public class DefaultStreamPoller implements StreamPoller {
6969

7070
private long maxPollSize;
7171
private int pollTimeout;
72-
private int pointerBasedLagUpdateIntervalMs;
72+
private long pointerBasedLagUpdateIntervalMs;
7373

7474
private final String indexName;
7575

@@ -95,7 +95,7 @@ private DefaultStreamPoller(
9595
int pollTimeout,
9696
int numProcessorThreads,
9797
int blockingQueueSize,
98-
int pointerBasedLagUpdateInterval
98+
long pointerBasedLagUpdateIntervalMs
9999
) {
100100
this(
101101
startPointer,
@@ -109,7 +109,7 @@ private DefaultStreamPoller(
109109
initialState,
110110
maxPollSize,
111111
pollTimeout,
112-
pointerBasedLagUpdateInterval,
112+
pointerBasedLagUpdateIntervalMs,
113113
ingestionEngine.config().getIndexSettings()
114114
);
115115
}
@@ -129,7 +129,7 @@ private DefaultStreamPoller(
129129
State initialState,
130130
long maxPollSize,
131131
int pollTimeout,
132-
int pointerBasedLagUpdateInterval,
132+
long pointerBasedLagUpdateIntervalMs,
133133
IndexSettings indexSettings
134134
) {
135135
this.consumerFactory = Objects.requireNonNull(consumerFactory);
@@ -141,7 +141,7 @@ private DefaultStreamPoller(
141141
this.state = initialState;
142142
this.maxPollSize = maxPollSize;
143143
this.pollTimeout = pollTimeout;
144-
this.pointerBasedLagUpdateIntervalMs = pointerBasedLagUpdateInterval;
144+
this.pointerBasedLagUpdateIntervalMs = pointerBasedLagUpdateIntervalMs;
145145
this.blockingQueueContainer = blockingQueueContainer;
146146
this.consumerThread = Executors.newSingleThreadExecutor(
147147
r -> new Thread(r, String.format(Locale.ROOT, "stream-poller-consumer-%d-%d", shardId, System.currentTimeMillis()))
@@ -532,7 +532,7 @@ public static class Builder {
532532
private int pollTimeout = 1000;
533533
private int numProcessorThreads = 1;
534534
private int blockingQueueSize = 100;
535-
private int pointerBasedLagUpdateInterval = 10000;
535+
private long pointerBasedLagUpdateIntervalMs = 10000;
536536

537537
/**
538538
* Initialize the builder with mandatory parameters
@@ -617,10 +617,10 @@ public Builder blockingQueueSize(int blockingQueueSize) {
617617
}
618618

619619
/**
620-
* Set pointer-based lag update interval
620+
* Set pointer-based lag update interval in milliseconds
621621
*/
622-
public Builder pointerBasedLagUpdateInterval(int pointerBasedLagUpdateInterval) {
623-
this.pointerBasedLagUpdateInterval = pointerBasedLagUpdateInterval;
622+
public Builder pointerBasedLagUpdateInterval(long pointerBasedLagUpdateInterval) {
623+
this.pointerBasedLagUpdateIntervalMs = pointerBasedLagUpdateInterval;
624624
return this;
625625
}
626626

@@ -642,7 +642,7 @@ public DefaultStreamPoller build() {
642642
pollTimeout,
643643
numProcessorThreads,
644644
blockingQueueSize,
645-
pointerBasedLagUpdateInterval
645+
pointerBasedLagUpdateIntervalMs
646646
);
647647
}
648648
}

server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.cluster.metadata;
1010

11+
import org.opensearch.common.unit.TimeValue;
1112
import org.opensearch.indices.pollingingest.StreamPoller;
1213
import org.opensearch.test.OpenSearchTestCase;
1314

@@ -30,7 +31,7 @@ public void testConstructorAndGetters() {
3031
.setPointerInitReset(pointerInitReset)
3132
.setErrorStrategy(DROP)
3233
.setBlockingQueueSize(1000)
33-
.setPointerBasedLagUpdateInterval(1000)
34+
.setPointerBasedLagUpdateInterval(TimeValue.timeValueSeconds(1))
3435
.build();
3536

3637
assertEquals("type", source.getType());
@@ -41,7 +42,7 @@ public void testConstructorAndGetters() {
4142
assertEquals(1000, source.getMaxPollSize());
4243
assertEquals(1000, source.getPollTimeout());
4344
assertEquals(1000, source.getBlockingQueueSize());
44-
assertEquals(1000, source.getPointerBasedLagUpdateInterval());
45+
assertEquals(1, source.getPointerBasedLagUpdateInterval().getSeconds());
4546
}
4647

4748
public void testEquals() {
@@ -107,7 +108,7 @@ public void testToString() {
107108
.setErrorStrategy(DROP)
108109
.build();
109110
String expected =
110-
"IngestionSource{type='type',pointer_init_reset='PointerInitReset{type='RESET_BY_OFFSET', value=1000}',error_strategy='DROP', params={key=value}, maxPollSize=1000, pollTimeout=1000, numProcessorThreads=1, blockingQueueSize=100, allActiveIngestion=false, pointerBasedLagUpdateInterval=10000}";
111+
"IngestionSource{type='type',pointer_init_reset='PointerInitReset{type='RESET_BY_OFFSET', value=1000}',error_strategy='DROP', params={key=value}, maxPollSize=1000, pollTimeout=1000, numProcessorThreads=1, blockingQueueSize=100, allActiveIngestion=false, pointerBasedLagUpdateInterval=10s}";
111112
assertEquals(expected, source.toString());
112113
}
113114

0 commit comments

Comments
 (0)