Skip to content

Commit 5ad90ff

Browse files
authored
Allows out of band rollovers on an index without causing ISM to fail (#180)
* Allows out of band rollovers on an index without causing ISM to fail Signed-off-by: Drew Baugher <46505179+dbbaughe@users.noreply.github.com> * Fixes detekt issue Signed-off-by: Drew Baugher <46505179+dbbaughe@users.noreply.github.com>
1 parent ab12279 commit 5ad90ff

File tree

4 files changed

+61
-9
lines changed

4 files changed

+61
-9
lines changed

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -72,18 +72,16 @@ class AttemptRolloverStep(
7272
return this
7373
}
7474

75-
// If we have already rolled over this index then fail as we only allow an index to be rolled over once
76-
if (managedIndexMetaData.rolledOver == true) {
77-
logger.warn("$indexName was already rolled over, cannot execute rollover step")
78-
stepStatus = StepStatus.FAILED
79-
info = mapOf("message" to getFailedDuplicateRolloverMessage(indexName))
80-
return this
81-
}
82-
8375
val (rolloverTarget, isDataStream) = getRolloverTargetOrUpdateInfo()
8476
// If the rolloverTarget is null, we would've already updated the failed info from getRolloverTargetOrUpdateInfo and can return early
8577
rolloverTarget ?: return this
8678

79+
if (clusterService.state().metadata.index(indexName).rolloverInfos.containsKey(rolloverTarget)) {
80+
stepStatus = StepStatus.COMPLETED
81+
info = mapOf("message" to getAlreadyRolledOverMessage(indexName, rolloverTarget))
82+
return this
83+
}
84+
8785
if (!isDataStream && !preCheckIndexAlias(rolloverTarget)) {
8886
stepStatus = StepStatus.FAILED
8987
info = mapOf("message" to getFailedPreCheckMessage(indexName))
@@ -285,13 +283,14 @@ class AttemptRolloverStep(
285283
"New index created, but failed to update alias [index=$index, newIndex=$newIndex]"
286284
fun getFailedDataStreamRolloverMessage(dataStream: String) = "Failed to rollover data stream [data_stream=$dataStream]"
287285
fun getFailedNoValidAliasMessage(index: String) = "Missing rollover_alias index setting [index=$index]"
288-
fun getFailedDuplicateRolloverMessage(index: String) = "Index has already been rolled over [index=$index]"
289286
fun getFailedEvaluateMessage(index: String) = "Failed to evaluate conditions for rollover [index=$index]"
290287
fun getPendingMessage(index: String) = "Pending rollover of index [index=$index]"
291288
fun getSuccessMessage(index: String) = "Successfully rolled over index [index=$index]"
292289
fun getSuccessDataStreamRolloverMessage(dataStream: String, index: String) =
293290
"Successfully rolled over data stream [data_stream=$dataStream index=$index]"
294291
fun getFailedPreCheckMessage(index: String) = "Missing alias or not the write index when rollover [index=$index]"
295292
fun getSkipRolloverMessage(index: String) = "Skipped rollover action for [index=$index]"
293+
fun getAlreadyRolledOverMessage(index: String, alias: String) =
294+
"This index has already been rolled over using this alias, treating as a success [index=$index, alias=$alias]"
296295
}
297296
}

src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,16 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
595595
return metadata
596596
}
597597

598+
protected fun rolloverIndex(index: String) {
599+
val response = client().performRequest(
600+
Request(
601+
"POST",
602+
"/$index/_rollover"
603+
)
604+
)
605+
assertEquals(response.statusLine.statusCode, RestStatus.OK.status)
606+
}
607+
598608
protected fun createRepository(
599609
repository: String
600610
) {

src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.action.Rollover
4242
import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification
4343
import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestRetryFailedManagedIndexAction
4444
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
45+
import org.opensearch.indexmanagement.indexstatemanagement.step.Step
4546
import org.opensearch.indexmanagement.indexstatemanagement.step.rollover.AttemptRolloverStep
4647
import org.opensearch.indexmanagement.makeRequest
4748
import org.opensearch.indexmanagement.waitFor
@@ -496,4 +497,46 @@ class RolloverActionIT : IndexStateManagementRestTestCase() {
496497
val secondIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 2L)
497498
Assert.assertTrue("New rollover index does not exist.", indexExists(secondIndexName))
498499
}
500+
501+
@Suppress("UNCHECKED_CAST")
502+
fun `test rollover from outside ISM doesn't fail ISM job`() {
503+
val aliasName = "${testIndexName}_alias"
504+
val indexNameBase = "${testIndexName}_index"
505+
val firstIndex = "$indexNameBase-1"
506+
val policyID = "${testIndexName}_testPolicyName_1"
507+
val actionConfig = RolloverActionConfig(null, null, null, 0)
508+
val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf()))
509+
val policy = Policy(
510+
id = policyID,
511+
description = "$testIndexName description",
512+
schemaVersion = 1L,
513+
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
514+
errorNotification = randomErrorNotification(),
515+
defaultState = states[0].name,
516+
states = states
517+
)
518+
519+
createPolicy(policy, policyID)
520+
// create index defaults
521+
createIndex(firstIndex, policyID, aliasName)
522+
523+
val managedIndexConfig = getExistingManagedIndexConfig(firstIndex)
524+
525+
// Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy
526+
updateManagedIndexConfigStartTime(managedIndexConfig)
527+
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(firstIndex).policyID) }
528+
529+
// Rollover the alias manually before ISM tries to roll it over
530+
rolloverIndex(aliasName)
531+
532+
// Need to speed up to second execution where it will trigger the first execution of the action
533+
updateManagedIndexConfigStartTime(managedIndexConfig)
534+
waitFor {
535+
val info = getExplainManagedIndexMetaData(firstIndex).info as Map<String, Any?>
536+
val stepMetadata = getExplainManagedIndexMetaData(firstIndex).stepMetaData
537+
assertEquals("Index should succeed if already rolled over.", AttemptRolloverStep.getAlreadyRolledOverMessage(firstIndex, aliasName), info["message"])
538+
assertEquals("Index should succeed if already rolled over.", Step.StepStatus.COMPLETED, stepMetadata?.stepStatus)
539+
}
540+
assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002"))
541+
}
499542
}
Binary file not shown.

0 commit comments

Comments
 (0)