Skip to content

Commit cfafe53

Browse files
pre check for rollover action (opensearch-project#88)
* pre check for rollover action Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
1 parent 23b3e9d commit cfafe53

File tree

9 files changed

+173
-3
lines changed

9 files changed

+173
-3
lines changed

.github/workflows/multi-node-test-workflow.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ jobs:
5656
with:
5757
repository: 'opensearch-project/alerting'
5858
path: alerting
59-
ref: 'main'
59+
ref: '1.0'
6060
- name: Build alerting
6161
working-directory: ./alerting
6262
run: ./gradlew :alerting-notification:publishToMavenLocal -Dopensearch.version=1.0.0 -Dbuild.snapshot=false

.github/workflows/test-and-build-workflow.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ jobs:
5656
with:
5757
repository: 'opensearch-project/alerting'
5858
path: alerting
59-
ref: 'main'
59+
ref: '1.0'
6060
- name: Build alerting
6161
working-directory: ./alerting
6262
run: ./gradlew :alerting-notification:publishToMavenLocal -Dopensearch.version=1.0.0 -Dbuild.snapshot=false

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ dependencies {
155155
compile "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
156156
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.7'
157157
compile "org.jetbrains:annotations:13.0"
158-
compile "org.opensearch:notification:1.0.0.0-rc1"
158+
compile "org.opensearch:notification:1.0.0.0"
159159
compile "org.opensearch:common-utils:1.0.0.0"
160160
compile "com.github.seancfoley:ipaddress:5.3.3"
161161

src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
364364
ManagedIndexSettings.HISTORY_NUMBER_OF_REPLICAS,
365365
ManagedIndexSettings.POLICY_ID,
366366
ManagedIndexSettings.ROLLOVER_ALIAS,
367+
ManagedIndexSettings.ROLLOVER_SKIP,
367368
ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED,
368369
ManagedIndexSettings.METADATA_SERVICE_ENABLED,
369370
ManagedIndexSettings.JOB_INTERVAL,

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ fun IndexMetadata.getRolloverAlias(): String? {
7979
return this.settings.get(ManagedIndexSettings.ROLLOVER_ALIAS.key)
8080
}
8181

82+
fun IndexMetadata.getRolloverSkip(): Boolean {
83+
return this.settings.getAsBoolean(ManagedIndexSettings.ROLLOVER_SKIP.key, false)
84+
}
85+
8286
fun IndexMetadata.getManagedIndexMetadata(): ManagedIndexMetaData? {
8387
val existingMetaDataMap = this.getCustomData(ManagedIndexMetaData.MANAGED_INDEX_METADATA_TYPE)
8488

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,13 @@ class ManagedIndexSettings {
6969
Setting.Property.Dynamic
7070
)
7171

72+
val ROLLOVER_SKIP: Setting<Boolean> = Setting.boolSetting(
73+
"index.plugins.index_state_management.rollover_skip",
74+
false,
75+
Setting.Property.IndexScope,
76+
Setting.Property.Dynamic
77+
)
78+
7279
val JOB_INTERVAL: Setting<Int> = Setting.intSetting(
7380
"plugins.index_state_management.job_interval",
7481
LegacyOpenDistroManagedIndexSettings.JOB_INTERVAL,

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexMet
4040
import org.opensearch.indexmanagement.indexstatemanagement.model.action.RolloverActionConfig
4141
import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StepMetaData
4242
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getRolloverAlias
43+
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getRolloverSkip
4344
import org.opensearch.indexmanagement.indexstatemanagement.step.Step
4445
import org.opensearch.indexmanagement.indexstatemanagement.util.evaluateConditions
4546
import org.opensearch.indexmanagement.opensearchapi.getUsefulCauseString
@@ -64,6 +65,13 @@ class AttemptRolloverStep(
6465

6566
@Suppress("TooGenericExceptionCaught")
6667
override suspend fun execute(): AttemptRolloverStep {
68+
val skipRollover = clusterService.state().metadata.index(indexName).getRolloverSkip()
69+
if (skipRollover) {
70+
stepStatus = StepStatus.COMPLETED
71+
info = mapOf("message" to getSkipRolloverMessage(indexName))
72+
return this
73+
}
74+
6775
// If we have already rolled over this index then fail as we only allow an index to be rolled over once
6876
if (managedIndexMetaData.rolledOver == true) {
6977
logger.warn("$indexName was already rolled over, cannot execute rollover step")
@@ -76,6 +84,12 @@ class AttemptRolloverStep(
7684
// If the rolloverTarget is null, we would've already updated the failed info from getRolloverTargetOrUpdateInfo and can return early
7785
rolloverTarget ?: return this
7886

87+
if (!isDataStream && !preCheckIndexAlias(rolloverTarget)) {
88+
stepStatus = StepStatus.FAILED
89+
info = mapOf("message" to getFailedPreCheckMessage(indexName))
90+
return this
91+
}
92+
7993
val statsResponse = getIndexStatsOrUpdateInfo()
8094
// If statsResponse is null we already updated failed info from getIndexStatsOrUpdateInfo and can return early
8195
statsResponse ?: return this
@@ -170,6 +184,36 @@ class AttemptRolloverStep(
170184
}
171185
}
172186

187+
/**
188+
* pre-condition check on managed-index's alias before rollover
189+
*
190+
* This will block
191+
* when managed index dont have alias
192+
* when managed index has alias but not the write index,
193+
* and this alias contains more than one index
194+
* User can use skip rollover setting to bypass this
195+
*
196+
* @param alias user defined ISM rollover alias
197+
*/
198+
private fun preCheckIndexAlias(alias: String): Boolean {
199+
val metadata = clusterService.state().metadata
200+
val indexAlias = metadata.index(indexName)?.aliases?.get(alias)
201+
logger.debug("Index $indexName has aliases $indexAlias")
202+
if (indexAlias == null) {
203+
return false
204+
}
205+
val isWriteIndex = indexAlias.writeIndex() // this could be null
206+
if (isWriteIndex != true) {
207+
val aliasIndices = metadata.indicesLookup[alias]?.indices?.map { it.index }
208+
logger.debug("Alias $alias contains indices $aliasIndices")
209+
if (aliasIndices != null && aliasIndices.size > 1) {
210+
return false
211+
}
212+
}
213+
214+
return true
215+
}
216+
173217
private fun getRolloverTargetOrUpdateInfo(): Pair<String?, Boolean> {
174218
val metadata = clusterService.state().metadata()
175219
val indexAbstraction = metadata.indicesLookup[indexName]
@@ -246,5 +290,7 @@ class AttemptRolloverStep(
246290
fun getSuccessMessage(index: String) = "Successfully rolled over index [index=$index]"
247291
fun getSuccessDataStreamRolloverMessage(dataStream: String, index: String) =
248292
"Successfully rolled over data stream [data_stream=$dataStream index=$index]"
293+
fun getFailedPreCheckMessage(index: String) = "Missing alias or not the write index when rollover [index=$index]"
294+
fun getSkipRolloverMessage(index: String) = "Skipped rollover action for [index=$index]"
249295
}
250296
}

src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,29 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
198198
return index to policyID
199199
}
200200

201+
protected fun changeAlias(
202+
index: String,
203+
alias: String,
204+
action: String = "remove",
205+
isWriteIndex: Boolean = false
206+
) {
207+
val isWriteIndexField = if (isWriteIndex) "\",\"is_write_index\": \"$isWriteIndex" else ""
208+
val body = """
209+
{
210+
"actions": [
211+
{
212+
"$action": {
213+
"index": "$index",
214+
"alias": "$alias$isWriteIndexField"
215+
}
216+
}
217+
]
218+
}
219+
""".trimIndent()
220+
val response = client().makeRequest("POST", "_aliases", StringEntity(body, APPLICATION_JSON))
221+
assertEquals("Unexpected RestStatus", RestStatus.OK, response.restStatus())
222+
}
223+
201224
/** Refresh all indices in the cluster */
202225
protected fun refresh() {
203226
val request = Request("POST", "/_refresh")
@@ -248,6 +271,30 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
248271
assertEquals("Request failed", RestStatus.OK, res.restStatus())
249272
}
250273

274+
protected fun updateIndexSetting(
275+
index: String,
276+
key: String,
277+
value: String
278+
) {
279+
val body = """
280+
{
281+
"$key" : "$value"
282+
}
283+
""".trimIndent()
284+
val res = client().makeRequest(
285+
"PUT", "$index/_settings", emptyMap(),
286+
StringEntity(body, APPLICATION_JSON)
287+
)
288+
assertEquals("Update index setting failed", RestStatus.OK, res.restStatus())
289+
}
290+
291+
protected fun getIndexSetting(index: String) {
292+
val res = client().makeRequest(
293+
"GET", "$index/_settings", emptyMap()
294+
)
295+
assertEquals("Update index setting failed", RestStatus.OK, res.restStatus())
296+
}
297+
251298
protected fun getManagedIndexConfig(index: String): ManagedIndexConfig? {
252299
val request = """
253300
{

src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,13 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
4040
import org.opensearch.indexmanagement.indexstatemanagement.model.State
4141
import org.opensearch.indexmanagement.indexstatemanagement.model.action.RolloverActionConfig
4242
import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification
43+
import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestRetryFailedManagedIndexAction
44+
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
4345
import org.opensearch.indexmanagement.indexstatemanagement.step.rollover.AttemptRolloverStep
4446
import org.opensearch.indexmanagement.makeRequest
4547
import org.opensearch.indexmanagement.waitFor
48+
import org.opensearch.rest.RestRequest
49+
import org.opensearch.rest.RestStatus
4650
import java.time.Instant
4751
import java.time.temporal.ChronoUnit
4852
import java.util.Locale
@@ -284,6 +288,67 @@ class RolloverActionIT : IndexStateManagementRestTestCase() {
284288
Assert.assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002"))
285289
}
286290

291+
fun `test rollover pre check`() {
292+
// index-1 alias x
293+
// index-2 alias x is_write_index
294+
// manage index-1, expect it fail to rollover
295+
val index1 = "index-1"
296+
val index2 = "index-2"
297+
val alias1 = "x"
298+
val policyID = "${testIndexName}_precheck"
299+
val actionConfig = RolloverActionConfig(null, 3, TimeValue.timeValueDays(2), 0)
300+
val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf()))
301+
val policy = Policy(
302+
id = policyID,
303+
description = "$testIndexName description",
304+
schemaVersion = 1L,
305+
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
306+
errorNotification = randomErrorNotification(),
307+
defaultState = states[0].name,
308+
states = states
309+
)
310+
createPolicy(policy, policyID)
311+
createIndex(index1, policyID)
312+
changeAlias(index1, alias1, "add")
313+
updateIndexSetting(index1, ManagedIndexSettings.ROLLOVER_ALIAS.key, alias1)
314+
createIndex(index2, policyID)
315+
changeAlias(index2, alias1, "add", true)
316+
updateIndexSetting(index2, ManagedIndexSettings.ROLLOVER_ALIAS.key, alias1)
317+
318+
val managedIndexConfig = getExistingManagedIndexConfig(index1)
319+
320+
// Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy
321+
updateManagedIndexConfigStartTime(managedIndexConfig)
322+
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(index1).policyID) }
323+
324+
// Need to speed up to second execution where it will trigger the first execution of the action
325+
updateManagedIndexConfigStartTime(managedIndexConfig)
326+
waitFor {
327+
val info = getExplainManagedIndexMetaData(index1).info as Map<String, Any?>
328+
assertEquals(
329+
"Index rollover not stopped by pre-check.",
330+
AttemptRolloverStep.getFailedPreCheckMessage(index1), info["message"]
331+
)
332+
}
333+
334+
updateIndexSetting(index1, ManagedIndexSettings.ROLLOVER_SKIP.key, "true")
335+
336+
val response = client().makeRequest(
337+
RestRequest.Method.POST.toString(),
338+
"${RestRetryFailedManagedIndexAction.RETRY_BASE_URI}/$index1"
339+
)
340+
assertEquals("Unexpected RestStatus", RestStatus.OK, response.restStatus())
341+
342+
updateManagedIndexConfigStartTime(managedIndexConfig)
343+
waitFor {
344+
val info = getExplainManagedIndexMetaData(index1).info as Map<String, Any?>
345+
assertEquals(
346+
"Index rollover not skip.",
347+
AttemptRolloverStep.getSkipRolloverMessage(index1), info["message"]
348+
)
349+
}
350+
}
351+
287352
fun `test data stream rollover no condition`() {
288353
val dataStreamName = "${testIndexName}_data_stream"
289354
val policyID = "${testIndexName}_rollover_policy"

0 commit comments

Comments
 (0)