Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ The current supported transition conditions are:
* Index size
* Index age
* Cron expression
* Alias presence
* ISM state age

## Contributing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.indexmanagement.indexstatemanagement.model

import org.opensearch.Version
import org.opensearch.common.unit.TimeValue
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
Expand Down Expand Up @@ -80,10 +81,12 @@ data class Conditions(
val size: ByteSizeValue? = null,
val cron: CronSchedule? = null,
val rolloverAge: TimeValue? = null,
val noAlias: Boolean? = null,
val minStateAge: TimeValue? = null,
) : ToXContentObject,
Writeable {
init {
val conditionsList = listOf(indexAge, docCount, size, cron, rolloverAge)
val conditionsList = listOf(indexAge, docCount, size, cron, rolloverAge, noAlias, minStateAge)
require(conditionsList.filterNotNull().size == 1) { "Cannot provide more than one Transition condition" }

// Validate doc count condition
Expand All @@ -100,6 +103,8 @@ data class Conditions(
if (size != null) builder.field(MIN_SIZE_FIELD, size.stringRep)
if (cron != null) builder.field(CRON_FIELD, cron)
if (rolloverAge != null) builder.field(MIN_ROLLOVER_AGE_FIELD, rolloverAge.stringRep)
if (noAlias != null) builder.field(NO_ALIAS_FIELD, noAlias)
if (minStateAge != null) builder.field(MIN_STATE_AGE_FIELD, minStateAge.stringRep)
return builder.endObject()
}

Expand All @@ -110,6 +115,8 @@ data class Conditions(
size = sin.readOptionalWriteable(::ByteSizeValue),
cron = sin.readOptionalWriteable(::CronSchedule),
rolloverAge = sin.readOptionalTimeValue(),
noAlias = if (sin.version.onOrAfter(Version.V_3_2_0)) sin.readOptionalBoolean() else null,
minStateAge = if (sin.version.onOrAfter(Version.V_3_2_0)) sin.readOptionalTimeValue() else null,
)

@Throws(IOException::class)
Expand All @@ -119,6 +126,10 @@ data class Conditions(
out.writeOptionalWriteable(size)
out.writeOptionalWriteable(cron)
out.writeOptionalTimeValue(rolloverAge)
if (out.version.onOrAfter(Version.V_3_2_0)) {
out.writeOptionalBoolean(noAlias)
out.writeOptionalTimeValue(minStateAge)
}
}

companion object {
Expand All @@ -127,6 +138,8 @@ data class Conditions(
const val MIN_SIZE_FIELD = "min_size"
const val CRON_FIELD = "cron"
const val MIN_ROLLOVER_AGE_FIELD = "min_rollover_age"
const val NO_ALIAS_FIELD = "no_alias"
const val MIN_STATE_AGE_FIELD = "min_state_age"

@JvmStatic
@Throws(IOException::class)
Expand All @@ -136,6 +149,8 @@ data class Conditions(
var size: ByteSizeValue? = null
var cron: CronSchedule? = null
var rolloverAge: TimeValue? = null
var noAlias: Boolean? = null
var minStateAge: TimeValue? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
Expand All @@ -148,11 +163,13 @@ data class Conditions(
MIN_SIZE_FIELD -> size = ByteSizeValue.parseBytesSizeValue(xcp.text(), MIN_SIZE_FIELD)
CRON_FIELD -> cron = ScheduleParser.parse(xcp) as? CronSchedule
MIN_ROLLOVER_AGE_FIELD -> rolloverAge = TimeValue.parseTimeValue(xcp.text(), MIN_ROLLOVER_AGE_FIELD)
NO_ALIAS_FIELD -> noAlias = xcp.booleanValue()
MIN_STATE_AGE_FIELD -> minStateAge = TimeValue.parseTimeValue(xcp.text(), MIN_STATE_AGE_FIELD)
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in Conditions.")
}
}

return Conditions(indexAge, docCount, size, cron, rolloverAge)
return Conditions(indexAge, docCount, size, cron, rolloverAge, noAlias, minStateAge)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider
import org.opensearch.indexmanagement.indexstatemanagement.action.TransitionsAction
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getOldestRolloverTime
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE
import org.opensearch.indexmanagement.indexstatemanagement.util.TransitionConditionContext
import org.opensearch.indexmanagement.indexstatemanagement.util.evaluateConditions
import org.opensearch.indexmanagement.indexstatemanagement.util.hasStatsConditions
import org.opensearch.indexmanagement.opensearchapi.getUsefulCauseString
Expand Down Expand Up @@ -100,9 +101,22 @@ class AttemptTransitionStep(private val action: TransitionsAction) : Step(name)
}

// Find the first transition that evaluates to true and get the state to transition to, otherwise return null if none are true
val indexAliasesCount = indexMetadata?.aliases?.size ?: 0
val stateStartTime = context.metadata.stateMetaData?.startTime
val stateStartInstant = stateStartTime?.let { Instant.ofEpochMilli(it) }
stateName =
transitions.find {
it.evaluateConditions(indexCreationDateInstant, numDocs, indexSize, stepStartTime, rolloverDate)
it.evaluateConditions(
TransitionConditionContext(
indexCreationDate = indexCreationDateInstant,
numDocs = numDocs,
indexSize = indexSize,
transitionStartTime = stepStartTime,
rolloverDate = rolloverDate,
indexAliasesCount = indexAliasesCount,
stateStartTime = stateStartInstant,
),
)
}?.stateName
val message: String
val stateName = stateName // shadowed on purpose to prevent var from changing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteAction
import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction
import org.opensearch.indexmanagement.indexstatemanagement.action.TransitionsAction
import org.opensearch.indexmanagement.indexstatemanagement.model.ChangePolicy
import org.opensearch.indexmanagement.indexstatemanagement.model.Conditions
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.model.State
Expand Down Expand Up @@ -182,47 +183,83 @@ fun getSweptManagedIndexSearchRequest(scroll: Boolean = false, size: Int = Manag
return req
}

@Suppress("ReturnCount", "ComplexCondition")
@Suppress("ReturnCount", "ComplexCondition", "LongParameterList")
data class TransitionConditionContext(
val indexCreationDate: Instant,
val numDocs: Long?,
val indexSize: ByteSizeValue?,
val transitionStartTime: Instant,
val rolloverDate: Instant?,
val indexAliasesCount: Int? = null,
val stateStartTime: Instant? = null,
)

@Suppress("ReturnCount")
fun Transition.evaluateConditions(
indexCreationDate: Instant,
numDocs: Long?,
indexSize: ByteSizeValue?,
transitionStartTime: Instant,
rolloverDate: Instant?,
context: TransitionConditionContext,
): Boolean {
// If there are no conditions, treat as always true
if (this.conditions == null) return true
val conditions = this.conditions ?: return true
if (checkDocCount(conditions, context)) return true
if (checkIndexAge(conditions, context)) return true
if (checkSize(conditions, context)) return true
if (checkCron(conditions, context)) return true
if (checkRolloverAge(conditions, context)) return true
if (checkNoAlias(conditions, context)) return true
if (checkMinStateAge(conditions, context)) return true
return false
}

if (this.conditions.docCount != null && numDocs != null) {
return this.conditions.docCount <= numDocs
}
private fun checkDocCount(conditions: Conditions, context: TransitionConditionContext): Boolean =
conditions.docCount != null &&
context.numDocs != null &&
conditions.docCount <= context.numDocs

if (this.conditions.indexAge != null) {
val indexCreationDateMilli = indexCreationDate.toEpochMilli()
if (indexCreationDateMilli == -1L) return false // transitions cannot currently be ORd like rollover, so we must return here
@Suppress("ReturnCount")
private fun checkIndexAge(conditions: Conditions, context: TransitionConditionContext): Boolean {
if (conditions.indexAge != null) {
val indexCreationDateMilli = context.indexCreationDate.toEpochMilli()
if (indexCreationDateMilli == -1L) return false
val elapsedTime = Instant.now().toEpochMilli() - indexCreationDateMilli
return this.conditions.indexAge.millis <= elapsedTime
return conditions.indexAge.millis <= elapsedTime
}
return false
}

if (this.conditions.size != null && indexSize != null) {
return this.conditions.size <= indexSize
}
private fun checkSize(conditions: Conditions, context: TransitionConditionContext): Boolean =
conditions.size != null &&
context.indexSize != null &&
conditions.size <= context.indexSize

if (this.conditions.cron != null) {
// If a cron pattern matches the time between the start of "attempt_transition" to now then we consider it meeting the condition
return this.conditions.cron.getNextExecutionTime(transitionStartTime) <= Instant.now()
private fun checkCron(conditions: Conditions, context: TransitionConditionContext): Boolean {
if (conditions.cron != null) {
return conditions.cron.getNextExecutionTime(context.transitionStartTime) <= Instant.now()
}
return false
}

if (this.conditions.rolloverAge != null) {
val rolloverDateMilli = rolloverDate?.toEpochMilli() ?: return false
@Suppress("ReturnCount")
private fun checkRolloverAge(conditions: Conditions, context: TransitionConditionContext): Boolean {
if (conditions.rolloverAge != null) {
val rolloverDateMilli = context.rolloverDate?.toEpochMilli() ?: return false
val elapsedTime = Instant.now().toEpochMilli() - rolloverDateMilli
return this.conditions.rolloverAge.millis <= elapsedTime
return conditions.rolloverAge.millis <= elapsedTime
}

// We should never reach this
return false
}

private fun checkNoAlias(conditions: Conditions, context: TransitionConditionContext): Boolean =
conditions.noAlias != null &&
context.indexAliasesCount != null &&
(
(conditions.noAlias && context.indexAliasesCount == 0) ||
(!conditions.noAlias && context.indexAliasesCount > 0)
)

private fun checkMinStateAge(conditions: Conditions, context: TransitionConditionContext): Boolean =
conditions.minStateAge != null &&
context.stateStartTime != null &&
(System.currentTimeMillis() - context.stateStartTime.toEpochMilli() >= conditions.minStateAge.millis)

fun Transition.hasStatsConditions(): Boolean = this.conditions?.docCount != null || this.conditions?.size != null

@Suppress("ReturnCount", "ComplexCondition")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,16 @@ import org.opensearch.indexmanagement.IndexManagementIndices.Companion.HISTORY_W
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase
import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction
import org.opensearch.indexmanagement.indexstatemanagement.model.Conditions
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.model.State
import org.opensearch.indexmanagement.indexstatemanagement.model.Transition
import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification
import org.opensearch.indexmanagement.indexstatemanagement.step.rollover.AttemptRolloverStep
import org.opensearch.indexmanagement.indexstatemanagement.step.transition.AttemptTransitionStep
import org.opensearch.indexmanagement.waitFor
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Locale

class ISMBackwardsCompatibilityIT : IndexStateManagementRestTestCase() {
Expand Down Expand Up @@ -130,6 +138,62 @@ class ISMBackwardsCompatibilityIT : IndexStateManagementRestTestCase() {
}
}

@Throws(Exception::class)
@Suppress("UNCHECKED_CAST")
fun `test existing transition conditions backwards compatibility`() {
val indexNameBase = "${testIndexName}_existing_conditions"
val index1 = "$indexNameBase-1"
val index2 = "$indexNameBase-2"
val policyID = "${testIndexName}_doc_count_policy"

val uri = getPluginUri()
val responseMap = getAsMap(uri)["nodes"] as Map<String, Map<String, Any>>
for (response in responseMap.values) {
val plugins = response["plugins"] as List<Map<String, Any>>
val pluginNames = plugins.map { plugin -> plugin["name"] }.toSet()
when (CLUSTER_TYPE) {
ClusterType.OLD -> {
assertTrue(pluginNames.contains("opendistro-index-management") || pluginNames.contains("opensearch-index-management"))

createDocCountTransitionPolicy(policyID)

createIndex(index1, policyID)
createIndex(index2, policyID)

// Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy
updateManagedIndexConfigStartTime(getExistingManagedIndexConfig(index1))
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(index1).policyID) }

// Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy
updateManagedIndexConfigStartTime(getExistingManagedIndexConfig(index2))
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(index2).policyID) }

verifyPendingTransition(index1)
verifyPendingTransition(index2)
}
ClusterType.MIXED -> {
assertTrue(pluginNames.contains("opensearch-index-management"))

verifyPendingTransition(index1)
verifyPendingTransition(index2)
}
ClusterType.UPGRADED -> {
assertTrue(pluginNames.contains("opensearch-index-management"))

verifyPendingTransition(index1)
insertSampleData(index = index1, docCount = 6, delay = 0)
verifySuccessfulTransition(index1)

insertSampleData(index = index2, docCount = 6, delay = 0)
verifySuccessfulTransition(index2)

deleteIndex("$indexNameBase*")
}
}
break
}
}

private fun createRolloverPolicy(policyID: String) {
val policy =
"""
Expand Down Expand Up @@ -209,4 +273,50 @@ class ISMBackwardsCompatibilityIT : IndexStateManagementRestTestCase() {
}
Assert.assertTrue("New rollover index does not exist.", indexExists(newIndex))
}

private fun createDocCountTransitionPolicy(policyID: String) {
val secondStateName = "second"
val states = listOf(
State("first", listOf(), listOf(Transition(secondStateName, Conditions(docCount = 5L)))),
State(secondStateName, listOf(), listOf()),
)

val policy = Policy(
id = policyID,
description = "BWC test policy with doc count transition",
schemaVersion = 5L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states,
)

createPolicy(policy, policyID)
}

private fun verifyPendingTransition(index: String) {
val managedIndexConfig = getExistingManagedIndexConfig(index)
// Need to speed up to second execution where it will trigger the first execution of transition evaluation
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor {
assertEquals(
"Index transitioned before it met the condition.",
AttemptTransitionStep.getEvaluatingMessage(index),
getExplainManagedIndexMetaData(index).info?.get("message"),
)
}
}

private fun verifySuccessfulTransition(index: String) {
val managedIndexConfig = getExistingManagedIndexConfig(index)
// Need to speed up to second execution where it will trigger the transition evaluation
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor {
assertEquals(
"Index did not transition successfully",
AttemptTransitionStep.getSuccessMessage(index, "second"),
getExplainManagedIndexMetaData(index).info?.get("message"),
)
}
}
}
Loading
Loading