Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/multi-node-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
with:
repository: 'opensearch-project/alerting'
path: alerting
ref: 'main'
ref: '1.0'
- name: Build alerting
working-directory: ./alerting
run: ./gradlew :alerting-notification:publishToMavenLocal -Dopensearch.version=1.0.0 -Dbuild.snapshot=false
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-and-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
with:
repository: 'opensearch-project/alerting'
path: alerting
ref: 'main'
ref: '1.0'
- name: Build alerting
working-directory: ./alerting
run: ./gradlew :alerting-notification:publishToMavenLocal -Dopensearch.version=1.0.0 -Dbuild.snapshot=false
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.7'
compile "org.jetbrains:annotations:13.0"
compile "org.opensearch:notification:1.0.0.0-rc1"
compile "org.opensearch:notification:1.0.0.0"
compile "org.opensearch:common-utils:1.0.0.0"
compile "com.github.seancfoley:ipaddress:5.3.3"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ManagedIndexSettings.HISTORY_NUMBER_OF_REPLICAS,
ManagedIndexSettings.POLICY_ID,
ManagedIndexSettings.ROLLOVER_ALIAS,
ManagedIndexSettings.ROLLOVER_SKIP,
ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED,
ManagedIndexSettings.METADATA_SERVICE_ENABLED,
ManagedIndexSettings.JOB_INTERVAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ fun IndexMetadata.getRolloverAlias(): String? {
return this.settings.get(ManagedIndexSettings.ROLLOVER_ALIAS.key)
}

fun IndexMetadata.getRolloverSkip(): Boolean {
return this.settings.getAsBoolean(ManagedIndexSettings.ROLLOVER_SKIP.key, false)
}

fun IndexMetadata.getManagedIndexMetadata(): ManagedIndexMetaData? {
val existingMetaDataMap = this.getCustomData(ManagedIndexMetaData.MANAGED_INDEX_METADATA_TYPE)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ class ManagedIndexSettings {
Setting.Property.Dynamic
)

val ROLLOVER_SKIP: Setting<Boolean> = Setting.boolSetting(
"index.plugins.index_state_management.rollover_skip",
false,
Setting.Property.IndexScope,
Setting.Property.Dynamic
)

val JOB_INTERVAL: Setting<Int> = Setting.intSetting(
"plugins.index_state_management.job_interval",
LegacyOpenDistroManagedIndexSettings.JOB_INTERVAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexMet
import org.opensearch.indexmanagement.indexstatemanagement.model.action.RolloverActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StepMetaData
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getRolloverAlias
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getRolloverSkip
import org.opensearch.indexmanagement.indexstatemanagement.step.Step
import org.opensearch.indexmanagement.indexstatemanagement.util.evaluateConditions
import org.opensearch.indexmanagement.opensearchapi.getUsefulCauseString
Expand All @@ -64,6 +65,13 @@ class AttemptRolloverStep(

@Suppress("TooGenericExceptionCaught")
override suspend fun execute(): AttemptRolloverStep {
val skipRollover = clusterService.state().metadata.index(indexName).getRolloverSkip()
if (skipRollover) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to getSkipRolloverMessage(indexName))
return this
}

// If we have already rolled over this index then fail as we only allow an index to be rolled over once
if (managedIndexMetaData.rolledOver == true) {
logger.warn("$indexName was already rolled over, cannot execute rollover step")
Expand All @@ -76,6 +84,12 @@ class AttemptRolloverStep(
// If the rolloverTarget is null, we would've already updated the failed info from getRolloverTargetOrUpdateInfo and can return early
rolloverTarget ?: return this

if (!isDataStream && !preCheckIndexAlias(rolloverTarget)) {
stepStatus = StepStatus.FAILED
info = mapOf("message" to getFailedPreCheckMessage(indexName))
return this
}

val statsResponse = getIndexStatsOrUpdateInfo()
// If statsResponse is null we already updated failed info from getIndexStatsOrUpdateInfo and can return early
statsResponse ?: return this
Expand Down Expand Up @@ -170,6 +184,36 @@ class AttemptRolloverStep(
}
}

/**
* pre-condition check on managed-index's alias before rollover
*
* This will block
* when managed index dont have alias
* when managed index has alias but not the write index,
* and this alias contains more than one index
* User can use skip rollover setting to bypass this
*
* @param alias user defined ISM rollover alias
*/
private fun preCheckIndexAlias(alias: String): Boolean {
val metadata = clusterService.state().metadata
val indexAlias = metadata.index(indexName)?.aliases?.get(alias)
logger.debug("Index $indexName has aliases $indexAlias")
if (indexAlias == null) {
return false
}
val isWriteIndex = indexAlias.writeIndex() // this could be null
if (isWriteIndex != true) {
val aliasIndices = metadata.indicesLookup[alias]?.indices?.map { it.index }
logger.debug("Alias $alias contains indices $aliasIndices")
if (aliasIndices != null && aliasIndices.size > 1) {
return false
}
}

return true
}

private fun getRolloverTargetOrUpdateInfo(): Pair<String?, Boolean> {
val metadata = clusterService.state().metadata()
val indexAbstraction = metadata.indicesLookup[indexName]
Expand Down Expand Up @@ -246,5 +290,7 @@ class AttemptRolloverStep(
fun getSuccessMessage(index: String) = "Successfully rolled over index [index=$index]"
fun getSuccessDataStreamRolloverMessage(dataStream: String, index: String) =
"Successfully rolled over data stream [data_stream=$dataStream index=$index]"
fun getFailedPreCheckMessage(index: String) = "Missing alias or not the write index when rollover [index=$index]"
fun getSkipRolloverMessage(index: String) = "Skipped rollover action for [index=$index]"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,29 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
return index to policyID
}

protected fun changeAlias(
index: String,
alias: String,
action: String = "remove",
isWriteIndex: Boolean = false
) {
val isWriteIndexField = if (isWriteIndex) "\",\"is_write_index\": \"$isWriteIndex" else ""
val body = """
{
"actions": [
{
"$action": {
"index": "$index",
"alias": "$alias$isWriteIndexField"
}
}
]
}
""".trimIndent()
val response = client().makeRequest("POST", "_aliases", StringEntity(body, APPLICATION_JSON))
assertEquals("Unexpected RestStatus", RestStatus.OK, response.restStatus())
}

/** Refresh all indices in the cluster */
protected fun refresh() {
val request = Request("POST", "/_refresh")
Expand Down Expand Up @@ -248,6 +271,30 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
assertEquals("Request failed", RestStatus.OK, res.restStatus())
}

protected fun updateIndexSetting(
index: String,
key: String,
value: String
) {
val body = """
{
"$key" : "$value"
}
""".trimIndent()
val res = client().makeRequest(
"PUT", "$index/_settings", emptyMap(),
StringEntity(body, APPLICATION_JSON)
)
assertEquals("Update index setting failed", RestStatus.OK, res.restStatus())
}

protected fun getIndexSetting(index: String) {
val res = client().makeRequest(
"GET", "$index/_settings", emptyMap()
)
assertEquals("Update index setting failed", RestStatus.OK, res.restStatus())
}

protected fun getManagedIndexConfig(index: String): ManagedIndexConfig? {
val request = """
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.model.State
import org.opensearch.indexmanagement.indexstatemanagement.model.action.RolloverActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification
import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestRetryFailedManagedIndexAction
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import org.opensearch.indexmanagement.indexstatemanagement.step.rollover.AttemptRolloverStep
import org.opensearch.indexmanagement.makeRequest
import org.opensearch.indexmanagement.waitFor
import org.opensearch.rest.RestRequest
import org.opensearch.rest.RestStatus
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Locale
Expand Down Expand Up @@ -284,6 +288,67 @@ class RolloverActionIT : IndexStateManagementRestTestCase() {
Assert.assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002"))
}

fun `test rollover pre check`() {
// index-1 alias x
// index-2 alias x is_write_index
// manage index-1, expect it fail to rollover
val index1 = "index-1"
val index2 = "index-2"
val alias1 = "x"
val policyID = "${testIndexName}_precheck"
val actionConfig = RolloverActionConfig(null, 3, TimeValue.timeValueDays(2), 0)
val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf()))
val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)
createPolicy(policy, policyID)
createIndex(index1, policyID)
changeAlias(index1, alias1, "add")
updateIndexSetting(index1, ManagedIndexSettings.ROLLOVER_ALIAS.key, alias1)
createIndex(index2, policyID)
changeAlias(index2, alias1, "add", true)
updateIndexSetting(index2, ManagedIndexSettings.ROLLOVER_ALIAS.key, alias1)

val managedIndexConfig = getExistingManagedIndexConfig(index1)

// Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(index1).policyID) }

// Need to speed up to second execution where it will trigger the first execution of the action
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor {
val info = getExplainManagedIndexMetaData(index1).info as Map<String, Any?>
assertEquals(
"Index rollover not stopped by pre-check.",
AttemptRolloverStep.getFailedPreCheckMessage(index1), info["message"]
)
}

updateIndexSetting(index1, ManagedIndexSettings.ROLLOVER_SKIP.key, "true")

val response = client().makeRequest(
RestRequest.Method.POST.toString(),
"${RestRetryFailedManagedIndexAction.RETRY_BASE_URI}/$index1"
)
assertEquals("Unexpected RestStatus", RestStatus.OK, response.restStatus())

updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor {
val info = getExplainManagedIndexMetaData(index1).info as Map<String, Any?>
assertEquals(
"Index rollover not skip.",
AttemptRolloverStep.getSkipRolloverMessage(index1), info["message"]
)
}
}

fun `test data stream rollover no condition`() {
val dataStreamName = "${testIndexName}_data_stream"
val policyID = "${testIndexName}_rollover_policy"
Expand Down