Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@ Compatible with OpenSearch 1.0.0-rc1

### Features

* Adds support for Transform feature ([#17](https://github.com/opensearch-project/index-management/pull/17))
* Adds support for Transform feature ([#17](https://github.com/opensearch-project/index-management/pull/17))

### Enhancements

* Improve integration with data streams ([#13](https://github.com/opensearch-project/index-management/pull/13))
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.opensearch.indexmanagement.util.IndexManagementException
import org.apache.logging.log4j.LogManager
import org.apache.lucene.util.automaton.Operations
import org.opensearch.OpenSearchException
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.common.Strings
import org.opensearch.common.ValidationException
Expand All @@ -49,18 +50,26 @@ private val log = LogManager.getLogger("ISMTemplateService")
* @return policyID
*/
@Suppress("ReturnCount")
fun Map<String, ISMTemplate>.findMatchingPolicy(indexMetadata: IndexMetadata): String? {
fun Map<String, ISMTemplate>.findMatchingPolicy(clusterState: ClusterState, indexName: String): String? {
if (this.isEmpty()) return null

val indexName = indexMetadata.index.name
val indexMetadata = clusterState.metadata.index(indexName)
val indexAbstraction = clusterState.metadata.indicesLookup[indexName]
val isDataStreamIndex = indexAbstraction?.parentDataStream != null

// don't include hidden index
// Don't include hidden index unless it belongs to a data stream.
val isHidden = IndexMetadata.INDEX_HIDDEN_SETTING.get(indexMetadata.settings)
if (isHidden) return null
if (!isDataStreamIndex && isHidden) return null

// If the index belongs to a data stream, then find the matching policy using the data stream name.
val lookupName = when {
isDataStreamIndex -> indexAbstraction?.parentDataStream?.name
else -> indexName
}

// only process indices created after template
// traverse all ism templates for matching ones
val patternMatchPredicate = { pattern: String -> Regex.simpleMatch(pattern, indexName) }
val patternMatchPredicate = { pattern: String -> Regex.simpleMatch(pattern, lookupName) }
var matchedPolicy: String? = null
var highestPriority: Int = -1
this.filter { (_, template) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ class ManagedIndexCoordinator(
val templates = getISMTemplates()

val indexToMatchedPolicy = indexNames.map { indexName ->
indexName to templates.findMatchingPolicy(indexMetadatas[indexName])
indexName to templates.findMatchingPolicy(clusterState, indexName)
}.toMap()

indexToMatchedPolicy.filterNotNullValues()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ class AttemptRolloverStep(
return this
}

val alias = getAliasOrUpdateInfo()
// If alias is null we already updated failed info from getAliasOrUpdateInfo and can return early
alias ?: return this
val (rolloverTarget, isDataStream) = getRolloverTargetOrUpdateInfo()
// If the rolloverTarget is null, we would've already updated the failed info from getRolloverTargetOrUpdateInfo and can return early
rolloverTarget ?: return this

val statsResponse = getIndexStatsOrUpdateInfo()
// If statsResponse is null we already updated failed info from getIndexStatsOrUpdateInfo and can return early
Expand Down Expand Up @@ -115,7 +115,7 @@ class AttemptRolloverStep(
if (config.evaluateConditions(indexAgeTimeValue, numDocs, indexSize)) {
logger.info("$indexName rollover conditions evaluated to true [indexCreationDate=$indexCreationDate," +
" numDocs=$numDocs, indexSize=${indexSize.bytes}]")
executeRollover(alias, conditions)
executeRollover(rolloverTarget, isDataStream, conditions)
} else {
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to getPendingMessage(indexName), "conditions" to conditions)
Expand All @@ -125,25 +125,35 @@ class AttemptRolloverStep(
}

@Suppress("ComplexMethod")
private suspend fun executeRollover(alias: String, conditions: Map<String, Map<String, Any?>>) {
private suspend fun executeRollover(rolloverTarget: String, isDataStream: Boolean, conditions: Map<String, Map<String, Any?>>) {
try {
val request = RolloverRequest(alias, null)
val request = RolloverRequest(rolloverTarget, null)
val response: RolloverResponse = client.admin().indices().suspendUntil { rolloverIndex(request, it) }

// Do not need to check for isRolledOver as we are not passing any conditions or dryrun=true
// which are the only two ways it comes back as false

// If response isAcknowledged it means the index was created and alias was added to new index
// If the response is acknowledged, then the new index is created and added to one of the following index abstractions:
// 1. IndexAbstraction.Type.DATA_STREAM - the new index is added to the data stream indicated by the 'rolloverTarget'
// 2. IndexAbstraction.Type.ALIAS - the new index is added to the alias indicated by the 'rolloverTarget'
if (response.isAcknowledged) {
val message = when {
isDataStream -> getSuccessDataStreamRolloverMessage(rolloverTarget, indexName)
else -> getSuccessMessage(indexName)
}

stepStatus = StepStatus.COMPLETED
info = listOfNotNull(
"message" to getSuccessMessage(indexName),
"message" to message,
if (conditions.isEmpty()) null else "conditions" to conditions // don't show empty conditions object if no conditions specified
).toMap()
} else {
// If the alias update response is NOT acknowledged we will get back isAcknowledged=false
// This means the new index was created but we failed to swap the alias
val message = getFailedAliasUpdateMessage(indexName, response.newIndex)
val message = when {
isDataStream -> getFailedDataStreamRolloverMessage(rolloverTarget)

// If the alias update response was NOT acknowledged, then the new index was created but we failed to swap the alias
else -> getFailedAliasUpdateMessage(indexName, response.newIndex)
}
logger.warn(message)
stepStatus = StepStatus.FAILED
info = listOfNotNull(
Expand All @@ -158,17 +168,24 @@ class AttemptRolloverStep(
}
}

private fun getAliasOrUpdateInfo(): String? {
val alias = clusterService.state().metadata().index(indexName).getRolloverAlias()
private fun getRolloverTargetOrUpdateInfo(): Pair<String?, Boolean> {
val metadata = clusterService.state().metadata()
val indexAbstraction = metadata.indicesLookup[indexName]
val isDataStreamIndex = indexAbstraction?.parentDataStream != null

val rolloverTarget = when {
isDataStreamIndex -> indexAbstraction?.parentDataStream?.name
else -> metadata.index(indexName).getRolloverAlias()
}

if (alias == null) {
if (rolloverTarget == null) {
val message = getFailedNoValidAliasMessage(indexName)
logger.warn(message)
stepStatus = StepStatus.FAILED
info = mapOf("message" to message)
}

return alias
return rolloverTarget to isDataStreamIndex
}

private suspend fun getIndexStatsOrUpdateInfo(): IndicesStatsResponse? {
Expand Down Expand Up @@ -219,10 +236,13 @@ class AttemptRolloverStep(
fun getFailedMessage(index: String) = "Failed to rollover index [index=$index]"
fun getFailedAliasUpdateMessage(index: String, newIndex: String) =
"New index created, but failed to update alias [index=$index, newIndex=$newIndex]"
fun getFailedDataStreamRolloverMessage(dataStream: String) = "Failed to rollover data stream [data_stream=$dataStream]"
fun getFailedNoValidAliasMessage(index: String) = "Missing rollover_alias index setting [index=$index]"
fun getFailedDuplicateRolloverMessage(index: String) = "Index has already been rolled over [index=$index]"
fun getFailedEvaluateMessage(index: String) = "Failed to evaluate conditions for rollover [index=$index]"
fun getPendingMessage(index: String) = "Pending rollover of index [index=$index]"
fun getSuccessMessage(index: String) = "Successfully rolled over index [index=$index]"
fun getSuccessDataStreamRolloverMessage(dataStream: String, index: String) =
"Successfully rolled over data stream [data_stream=$dataStream index=$index]"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class RollupMapperService(
// Allow no indices, open and closed
// Rolling up on closed indices will not be caught here
val concreteIndices =
indexNameExpressionResolver.concreteIndexNames(clusterService.state(), IndicesOptions.lenientExpand(), rollup.sourceIndex)
indexNameExpressionResolver.concreteIndexNames(clusterService.state(), IndicesOptions.lenientExpand(), true, rollup.sourceIndex)
if (concreteIndices.isEmpty()) return RollupJobValidationResult.Invalid("No indices found for [${rollup.sourceIndex}]")

// Validate mappings for each concrete index resolved from the rollup source index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,12 @@ class FieldCapsFilter(

private fun populateSourceFieldMappingsForRollupJob(rollup: Rollup): Set<RollupFieldMapping> {
val rollupFieldMappings = rollup.populateFieldMappings()
val sourceIndices = indexNameExpressionResolver.concreteIndexNames(clusterService.state(), IndicesOptions.lenientExpand(), rollup.sourceIndex)
val sourceIndices = indexNameExpressionResolver.concreteIndexNames(
clusterService.state(),
IndicesOptions.lenientExpand(),
true,
rollup.sourceIndex
)
sourceIndices.forEach {
val mappings = clusterService.state().metadata.index(it).mapping()?.sourceAsMap ?: return rollupFieldMappings
rollupFieldMappings.forEach { fieldMapping ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class RollupInterceptor(

val indices = request.indices().map { it.toString() }.toTypedArray()
val concreteIndices = indexNameExpressionResolver
.concreteIndexNames(clusterService.state(), request.indicesOptions(), *indices)
.concreteIndexNames(clusterService.state(), request.indicesOptions(), *indices)

if (concreteIndices.size > 1) {
logger.warn("There can be only one index in search request if its a rollup search - requested to search [${concreteIndices
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ package org.opensearch.indexmanagement.indexstatemanagement.action

import org.apache.http.entity.ContentType
import org.apache.http.entity.StringEntity
import org.opensearch.cluster.metadata.DataStream
import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase
import org.opensearch.indexmanagement.indexstatemanagement.model.ISMTemplate
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.model.State
import org.opensearch.indexmanagement.indexstatemanagement.model.action.RolloverActionConfig
Expand Down Expand Up @@ -264,4 +266,152 @@ class RolloverActionIT : IndexStateManagementRestTestCase() {
}
Assert.assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002"))
}

fun `test data stream rollover no condition`() {
val dataStreamName = "${testIndexName}_data_stream"
val policyID = "${testIndexName}_rollover_policy"

// Create the rollover policy
val rolloverActionConfig = RolloverActionConfig(null, null, null, 0)
val states = listOf(State(name = "default", actions = listOf(rolloverActionConfig), transitions = listOf()))
val policy = Policy(
id = policyID,
description = "rollover policy description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states,
ismTemplate = ISMTemplate(listOf(dataStreamName), 100, Instant.now().truncatedTo(ChronoUnit.MILLIS))
)
createPolicy(policy, policyID)

// Create the data stream
val firstIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1L)
client().makeRequest(
"PUT",
"/_index_template/rollover-data-stream-template",
StringEntity("{ \"index_patterns\": [ \"$dataStreamName\" ], \"data_stream\": { } }", ContentType.APPLICATION_JSON)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about index_patterns format here. Take logs-redis for an example, normally we will use logs-redis* as the index pattern with * in the end. But it seems here we use logs-redis, is this sth by design? And logs-redis* is wrong?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The index pattern matches the name of the data stream instead of the backing indices. This is by design and consistent with how index patterns work in index templates as well.

A data stream named logs-redis will have backing indices like .ds-logs-redis-000001, .ds-logs-redis-000002 and so on. When defining an index pattern like index_pattern: ["logs-redis"], I'm just performing an exact match on one data stream.

I could also define index_pattern: ["logs-redis-*"] which would allow me to match multiple data streams (and effectively their backing indices). This would match "logs-redis-prod", "logs-redis-staging" and so on.

)
client().makeRequest("PUT", "/_data_stream/$dataStreamName")

var managedIndexConfig = getExistingManagedIndexConfig(firstIndexName)

// Change the start time so that the job will trigger in 2 seconds. This will trigger the first initialization of the policy.
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(firstIndexName).policyID) }

// Speed up to the second execution of the policy where it will trigger the first execution of the action.
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor {
val info = getExplainManagedIndexMetaData(firstIndexName).info as Map<String, Any?>
assertEquals(
"Data stream did not rollover.",
AttemptRolloverStep.getSuccessDataStreamRolloverMessage(dataStreamName, firstIndexName),
info["message"]
)
assertNull("Should not have conditions if none specified", info["conditions"])
}

val secondIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 2L)
Assert.assertTrue("New rollover index does not exist.", indexExists(secondIndexName))

// Ensure that that policy is applied to the newly created index as well.
managedIndexConfig = getExistingManagedIndexConfig(secondIndexName)
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(secondIndexName).policyID) }
}

@Suppress("UNCHECKED_CAST")
fun `test data stream rollover multi condition doc size`() {
val dataStreamName = "${testIndexName}_data_stream_multi"
val policyID = "${testIndexName}_rollover_policy_multi"

// Create the rollover policy
val rolloverActionConfig = RolloverActionConfig(null, 3, TimeValue.timeValueDays(2), 0)
val states = listOf(State(name = "default", actions = listOf(rolloverActionConfig), transitions = listOf()))
val policy = Policy(
id = policyID,
description = "rollover policy description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states,
ismTemplate = ISMTemplate(listOf(dataStreamName), 100, Instant.now().truncatedTo(ChronoUnit.MILLIS))
)
createPolicy(policy, policyID)

// Create the data stream
val firstIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1L)
client().makeRequest(
"PUT",
"/_index_template/rollover-data-stream-template",
StringEntity("{ \"index_patterns\": [ \"$dataStreamName\" ], \"data_stream\": { } }", ContentType.APPLICATION_JSON)
)
client().makeRequest("PUT", "/_data_stream/$dataStreamName")

val managedIndexConfig = getExistingManagedIndexConfig(firstIndexName)

// Change the start time so that the job will trigger in 2 seconds. This will trigger the first initialization of the policy.
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(firstIndexName).policyID) }

// Speed up to the second execution of the policy where it will trigger the first execution of the action.
// Rollover shouldn't have happened yet as the conditions aren't met.
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor {
val info = getExplainManagedIndexMetaData(firstIndexName).info as Map<String, Any?>
assertEquals(
"Index rollover before it met the condition.",
AttemptRolloverStep.getPendingMessage(firstIndexName),
info["message"]
)

val conditions = info["conditions"] as Map<String, Any?>
assertEquals(
"Did not have exclusively min age and min doc count conditions",
setOf(RolloverActionConfig.MIN_INDEX_AGE_FIELD, RolloverActionConfig.MIN_DOC_COUNT_FIELD),
conditions.keys
)

val minAge = conditions[RolloverActionConfig.MIN_INDEX_AGE_FIELD] as Map<String, Any?>
val minDocCount = conditions[RolloverActionConfig.MIN_DOC_COUNT_FIELD] as Map<String, Any?>
assertEquals("Incorrect min age condition", "2d", minAge["condition"])
assertEquals("Incorrect min docs condition", 3, minDocCount["condition"])
assertThat("Missing min age current", minAge["current"], isA(String::class.java))
assertEquals("Incorrect min docs current", 0, minDocCount["current"])
}

insertSampleData(index = dataStreamName, docCount = 5, jsonString = "{ \"@timestamp\": \"2020-12-06T11:04:05.000Z\" }")

// Speed up to the third execution of the policy where it will trigger the second execution of the action.
// Rollover should have happened as the conditions were met.
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor {
val info = getExplainManagedIndexMetaData(firstIndexName).info as Map<String, Any?>
assertEquals(
"Data stream did not rollover",
AttemptRolloverStep.getSuccessDataStreamRolloverMessage(dataStreamName, firstIndexName),
info["message"]
)

val conditions = info["conditions"] as Map<String, Any?>
assertEquals(
"Did not have exclusively min age and min doc count conditions",
setOf(RolloverActionConfig.MIN_INDEX_AGE_FIELD, RolloverActionConfig.MIN_DOC_COUNT_FIELD),
conditions.keys
)

val minAge = conditions[RolloverActionConfig.MIN_INDEX_AGE_FIELD] as Map<String, Any?>
val minDocCount = conditions[RolloverActionConfig.MIN_DOC_COUNT_FIELD] as Map<String, Any?>
assertEquals("Incorrect min age condition", "2d", minAge["condition"])
assertEquals("Incorrect min docs condition", 3, minDocCount["condition"])
assertThat("Missing min age current", minAge["current"], isA(String::class.java))
assertEquals("Incorrect min docs current", 5, minDocCount["current"])
}

val secondIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 2L)
Assert.assertTrue("New rollover index does not exist.", indexExists(secondIndexName))
}
}
Loading