Skip to content

Commit ce9bb6f

Browse files
committed
Adds jitter cluster setting, sets jitter to 0 for ISM tests
Signed-off-by: Clay Downs <downsrob@amazon.com>
1 parent dafd3bb commit ce9bb6f

File tree

18 files changed

+147
-18
lines changed

18 files changed

+147
-18
lines changed

src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
380380
ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED,
381381
ManagedIndexSettings.METADATA_SERVICE_ENABLED,
382382
ManagedIndexSettings.AUTO_MANAGE,
383+
ManagedIndexSettings.JITTER,
383384
ManagedIndexSettings.JOB_INTERVAL,
384385
ManagedIndexSettings.SWEEP_PERIOD,
385386
ManagedIndexSettings.COORDINATOR_BACKOFF_COUNT,

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndex
7575
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.COORDINATOR_BACKOFF_COUNT
7676
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.COORDINATOR_BACKOFF_MILLIS
7777
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.INDEX_STATE_MANAGEMENT_ENABLED
78+
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.JITTER
7879
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.JOB_INTERVAL
7980
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.METADATA_SERVICE_ENABLED
8081
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SWEEP_PERIOD
@@ -145,6 +146,7 @@ class ManagedIndexCoordinator(
145146
@Volatile private var retryPolicy =
146147
BackoffPolicy.constantBackoff(COORDINATOR_BACKOFF_MILLIS.get(settings), COORDINATOR_BACKOFF_COUNT.get(settings))
147148
@Volatile private var jobInterval = JOB_INTERVAL.get(settings)
149+
@Volatile private var jobJitter = JITTER.get(settings)
148150

149151
@Volatile private var isMaster = false
150152

@@ -158,6 +160,9 @@ class ManagedIndexCoordinator(
158160
clusterService.clusterSettings.addSettingsUpdateConsumer(JOB_INTERVAL) {
159161
jobInterval = it
160162
}
163+
clusterService.clusterSettings.addSettingsUpdateConsumer(JITTER) {
164+
jobJitter = it
165+
}
161166
clusterService.clusterSettings.addSettingsUpdateConsumer(INDEX_STATE_MANAGEMENT_ENABLED) {
162167
indexStateManagementEnabled = it
163168
if (!indexStateManagementEnabled) disable() else enable()
@@ -328,7 +333,8 @@ class ManagedIndexCoordinator(
328333
indexUuid,
329334
policy.id,
330335
jobInterval,
331-
policy
336+
policy,
337+
jobJitter
332338
)
333339
)
334340
}

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ data class ManagedIndexConfig(
5656
val policySeqNo: Long?,
5757
val policyPrimaryTerm: Long?,
5858
val policy: Policy?,
59-
val changePolicy: ChangePolicy?
59+
val changePolicy: ChangePolicy?,
60+
val jobJitter: Double?
6061
) : ScheduledJobParameter {
6162

6263
init {
@@ -79,6 +80,10 @@ data class ManagedIndexConfig(
7980

8081
override fun getLockDurationSeconds(): Long = 3600L // 1 hour
8182

83+
override fun getJitter(): Double? {
84+
return jobJitter
85+
}
86+
8287
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
8388
builder
8489
.startObject()
@@ -95,6 +100,7 @@ data class ManagedIndexConfig(
95100
.field(POLICY_PRIMARY_TERM_FIELD, policyPrimaryTerm)
96101
.field(POLICY_FIELD, policy, XCONTENT_WITHOUT_TYPE)
97102
.field(CHANGE_POLICY_FIELD, changePolicy)
103+
.field(JITTER, jobJitter)
98104
builder.endObject()
99105
return builder.endObject()
100106
}
@@ -114,6 +120,7 @@ data class ManagedIndexConfig(
114120
const val POLICY_SEQ_NO_FIELD = "policy_seq_no"
115121
const val POLICY_PRIMARY_TERM_FIELD = "policy_primary_term"
116122
const val CHANGE_POLICY_FIELD = "change_policy"
123+
const val JITTER = "jitter"
117124

118125
@Suppress("ComplexMethod", "LongMethod")
119126
@JvmStatic
@@ -137,6 +144,7 @@ data class ManagedIndexConfig(
137144
var enabled = true
138145
var policyPrimaryTerm: Long? = SequenceNumbers.UNASSIGNED_PRIMARY_TERM
139146
var policySeqNo: Long? = SequenceNumbers.UNASSIGNED_SEQ_NO
147+
var jitter: Double? = null
140148

141149
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
142150
while (xcp.nextToken() != Token.END_OBJECT) {
@@ -164,6 +172,9 @@ data class ManagedIndexConfig(
164172
CHANGE_POLICY_FIELD -> {
165173
changePolicy = if (xcp.currentToken() == Token.VALUE_NULL) null else ChangePolicy.parse(xcp)
166174
}
175+
JITTER -> {
176+
jitter = if (xcp.currentToken() == Token.VALUE_NULL) null else xcp.doubleValue()
177+
}
167178
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ManagedIndexConfig.")
168179
}
169180
}
@@ -192,7 +203,8 @@ data class ManagedIndexConfig(
192203
seqNo = policySeqNo ?: SequenceNumbers.UNASSIGNED_SEQ_NO,
193204
primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM
194205
),
195-
changePolicy = changePolicy
206+
changePolicy = changePolicy,
207+
jobJitter = jitter
196208
)
197209
}
198210
}

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class ManagedIndexSettings {
3636
const val DEFAULT_ISM_ENABLED = true
3737
const val DEFAULT_METADATA_SERVICE_ENABLED = true
3838
const val DEFAULT_JOB_INTERVAL = 5
39+
const val DEFAULT_JITTER = 0.6
3940
private val ALLOW_LIST_ALL = ActionConfig.ActionType.values().toList().map { it.type }
4041
val ALLOW_LIST_NONE = emptyList<String>()
4142
val SNAPSHOT_DENY_LIST_NONE = emptyList<String>()
@@ -179,5 +180,14 @@ class ManagedIndexSettings {
179180
Setting.Property.NodeScope,
180181
Setting.Property.Dynamic
181182
)
183+
184+
val JITTER: Setting<Double> = Setting.doubleSetting(
185+
"plugins.index_state_management.jitter",
186+
DEFAULT_JITTER,
187+
0.0,
188+
Double.MAX_VALUE,
189+
Setting.Property.NodeScope,
190+
Setting.Property.Dynamic
191+
)
182192
}
183193
}

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,16 @@ class TransportAddPolicyAction @Inject constructor(
9393
) {
9494

9595
@Volatile private var jobInterval = ManagedIndexSettings.JOB_INTERVAL.get(settings)
96+
@Volatile private var jobJitter = ManagedIndexSettings.JITTER.get(settings)
9697
@Volatile private var filterByEnabled = IndexManagementSettings.FILTER_BY_BACKEND_ROLES.get(settings)
9798

9899
init {
99100
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.JOB_INTERVAL) {
100101
jobInterval = it
101102
}
103+
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.JITTER) {
104+
jobJitter = it
105+
}
102106
clusterService.clusterSettings.addSettingsUpdateConsumer(IndexManagementSettings.FILTER_BY_BACKEND_ROLES) {
103107
filterByEnabled = it
104108
}
@@ -329,7 +333,7 @@ class TransportAddPolicyAction @Inject constructor(
329333
val bulkReq = BulkRequest().timeout(TimeValue.timeValueMillis(bulkReqTimeout))
330334
indicesToAdd.forEach { (uuid, name) ->
331335
bulkReq.add(
332-
managedIndexConfigIndexRequest(name, uuid, request.policyID, jobInterval, policy = policy.copy(user = this.user))
336+
managedIndexConfigIndexRequest(name, uuid, request.policyID, jobInterval, policy = policy.copy(user = this.user), jobJitter)
333337
)
334338
}
335339

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,14 @@ import java.net.InetAddress
7676
import java.time.Instant
7777
import java.time.temporal.ChronoUnit
7878

79+
@Suppress("LongParameterList")
7980
fun managedIndexConfigIndexRequest(
8081
index: String,
8182
uuid: String,
8283
policyID: String,
8384
jobInterval: Int,
84-
policy: Policy? = null
85+
policy: Policy? = null,
86+
jobJitter: Double?
8587
): IndexRequest {
8688
val managedIndexConfig = ManagedIndexConfig(
8789
jobName = index,
@@ -95,7 +97,8 @@ fun managedIndexConfigIndexRequest(
9597
policy = policy,
9698
policySeqNo = policy?.seqNo,
9799
policyPrimaryTerm = policy?.primaryTerm,
98-
changePolicy = null
100+
changePolicy = null,
101+
jobJitter = jobJitter
99102
)
100103

101104
return IndexRequest(INDEX_MANAGEMENT_INDEX)

src/main/resources/mappings/opendistro-ism-config.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"_meta" : {
3-
"schema_version": 11
3+
"schema_version": 12
44
},
55
"dynamic": "strict",
66
"properties": {
@@ -614,6 +614,9 @@
614614
}
615615
}
616616
}
617+
},
618+
"jitter": {
619+
"type": "double"
617620
}
618621
}
619622
},

src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import javax.management.remote.JMXServiceURL
4646

4747
abstract class IndexManagementRestTestCase : ODFERestTestCase() {
4848

49-
val configSchemaVersion = 11
49+
val configSchemaVersion = 12
5050
val historySchemaVersion = 3
5151

5252
// Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as

src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() {
103103
ManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS,
104104
ManagedIndexSettings.ALLOW_LIST,
105105
ManagedIndexSettings.SNAPSHOT_DENY_LIST,
106+
ManagedIndexSettings.JITTER,
106107
RollupSettings.ROLLUP_INGEST_BACKOFF_COUNT,
107108
RollupSettings.ROLLUP_INGEST_BACKOFF_MILLIS,
108109
RollupSettings.ROLLUP_SEARCH_BACKOFF_COUNT,

src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementIntegTestCase.kt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ package org.opensearch.indexmanagement.indexstatemanagement
2828

2929
import org.apache.http.entity.ContentType
3030
import org.apache.http.entity.StringEntity
31+
import org.junit.Before
3132
import org.opensearch.OpenSearchParseException
3233
import org.opensearch.action.ActionRequest
3334
import org.opensearch.action.ActionResponse
@@ -54,6 +55,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
5455
import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.PolicyRetryInfoMetaData
5556
import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StateMetaData
5657
import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestExplainAction
58+
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
5759
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.ExplainAction
5860
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.TransportExplainAction
5961
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.TransportUpdateManagedIndexMetaDataAction
@@ -73,6 +75,11 @@ import java.time.Duration
7375
import java.time.Instant
7476

7577
abstract class IndexStateManagementIntegTestCase : OpenSearchIntegTestCase() {
78+
@Before
79+
fun disableIndexStateManagementJitter() {
80+
// jitter would add a test-breaking delay to the integration tests
81+
updateIndexStateManagementJitterSetting(0.0)
82+
}
7683

7784
protected val isMixedNodeRegressionTest = System.getProperty("cluster.mixed", "false")!!.toBoolean()
7885

@@ -357,4 +364,8 @@ abstract class IndexStateManagementIntegTestCase : OpenSearchIntegTestCase() {
357364
)
358365
assertEquals("Request failed", RestStatus.OK, res.restStatus())
359366
}
367+
368+
protected fun updateIndexStateManagementJitterSetting(value: Double?) {
369+
updateClusterSetting(ManagedIndexSettings.JITTER.key, value.toString(), false)
370+
}
360371
}

0 commit comments

Comments
 (0)