Skip to content

Commit

Permalink
Explain response still use old opendistro policy id (#109)
Browse files Browse the repository at this point in the history
* Explain response still use old opendistro policy id
* Use hardcoded policyid setting in tests for explain response
* Trying to fix flaky tests
  • Loading branch information
bowenlan-amzn authored Aug 3, 2021
1 parent 1058af0 commit 1860f38
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import kotlinx.coroutines.withContext
import org.apache.logging.log4j.LogManager
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.cluster.state.ClusterStateResponse
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
Expand Down Expand Up @@ -722,17 +721,7 @@ object ManagedIndexRunner :
if (!updated.metadataSaved || policy == null) return

// this will save the new policy on the job and reset the change policy back to null
val saved = savePolicyToManagedIndexConfig(managedIndexConfig, policy)

if (saved) {
/*
* If we successfully saved the the new policy then the last thing we need to do is update the
* opendistro.indexstatemanagement.policy_id setting to the new policy id we don't care that much if this fails, because we'll
* have a check in the beginning of the runner to read in the setting and compare it with the policy_id on the job and update
* the setting if they ever differ, as we do not allow someone to change an existing policy using _settings API
* */
updateIndexPolicyIDSetting(managedIndexConfig.index, changePolicy.policyID)
}
savePolicyToManagedIndexConfig(managedIndexConfig, policy)
}

@Suppress("TooGenericExceptionCaught")
Expand All @@ -750,29 +739,6 @@ object ManagedIndexRunner :
}
}

/**
* Once we successfully swap over a ChangePolicy then we need to update the [ManagedIndexSettings.POLICY_ID] setting.
*
* We will constantly check the [ManagedIndexSettings.POLICY_ID] against the [ManagedIndexConfig] policyID and if
* there is ever a mismatch we will overwrite the [ManagedIndexSettings.POLICY_ID] with the [ManagedIndexConfig] policyID.
*
* We do this because if this fails we want to ensure we try again on the next execution of the job. At the same time, this
* will disallow the user from directly using the _settings API to change the policy_id. We do not want to allow this,
* they must use the ChangePolicy API as the [ManagedIndexSettings.POLICY_ID] is referring to the currently running policy.
*/
private suspend fun updateIndexPolicyIDSetting(index: String, policyID: String) {
try {
val settings = Settings.builder().put(ManagedIndexSettings.POLICY_ID.key, policyID).build()
val updateSettingsRequest = UpdateSettingsRequest(index).settings(settings)
val response: AcknowledgedResponse = client.admin().indices().suspendUntil { updateSettings(updateSettingsRequest, it) }
if (!response.isAcknowledged) {
logger.warn("Updating policy_id ($policyID) for $index was not acknowledged")
}
} catch (e: Exception) {
logger.error("There was an error while trying to update the policy_id ($policyID) setting for $index", e)
}
}

private suspend fun publishErrorNotification(policy: Policy, managedIndexMetaData: ManagedIndexMetaData) {
policy.errorNotification?.run {
errorNotificationRetryPolicy.retry(logger) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.indexstatemanagement.settings.LegacyOpenDistroManagedIndexSettings
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import java.io.IOException

Expand Down Expand Up @@ -71,6 +72,7 @@ class ExplainAllResponse : ExplainResponse, ToXContentObject {
builder.startObject()
indexNames.forEachIndexed { ind, name ->
builder.startObject(name)
builder.field(LegacyOpenDistroManagedIndexSettings.POLICY_ID.key, indexPolicyIDs[ind])
builder.field(ManagedIndexSettings.POLICY_ID.key, indexPolicyIDs[ind])
indexMetadatas[ind]?.toXContent(builder, ToXContent.EMPTY_PARAMS)
builder.field("enabled", enabledState[name])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.indexstatemanagement.settings.LegacyOpenDistroManagedIndexSettings
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import java.io.IOException

Expand Down Expand Up @@ -71,6 +72,7 @@ open class ExplainResponse : ActionResponse, ToXContentObject {
builder.startObject()
indexNames.forEachIndexed { ind, name ->
builder.startObject(name)
builder.field(LegacyOpenDistroManagedIndexSettings.POLICY_ID.key, indexPolicyIDs[ind])
builder.field(ManagedIndexSettings.POLICY_ID.key, indexPolicyIDs[ind])
indexMetadatas[ind]?.toXContent(builder, ToXContent.EMPTY_PARAMS)
builder.endObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ import java.util.Locale

abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() {

val explainResponseOpendistroPolicyIdSetting = "index.opendistro.index_state_management.policy_id"
val explainResponseOpenSearchPolicyIdSetting = "index.plugins.index_state_management.policy_id"

protected fun createPolicy(
policy: Policy,
policyId: String = OpenSearchTestCase.randomAlphaOfLength(10),
Expand Down Expand Up @@ -244,12 +247,6 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
client().makeRequest("POST", "/_opendistro/_ism/remove/$index")
}

@Suppress("UNCHECKED_CAST")
protected fun getPolicyFromIndex(index: String): String? {
val indexSettings = getIndexSettings(index) as Map<String, Map<String, Map<String, Any?>>>
return indexSettings[index]!!["settings"]!![ManagedIndexSettings.POLICY_ID.key] as? String
}

protected fun getPolicyIDOfManagedIndex(index: String): String? {
val managedIndex = getManagedIndexConfig(index)
return managedIndex?.policyID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmet
import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.PolicyRetryInfoMetaData
import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StateMetaData
import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StepMetaData
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import org.opensearch.indexmanagement.indexstatemanagement.step.Step
import org.opensearch.indexmanagement.indexstatemanagement.step.rollover.AttemptRolloverStep
import org.opensearch.indexmanagement.waitFor
Expand Down Expand Up @@ -159,7 +158,8 @@ class ActionRetryIT : IndexStateManagementRestTestCase() {
assertPredicatesOnMetaData(
listOf(
indexName to listOf(
ManagedIndexSettings.POLICY_ID.key to policyID::equals,
explainResponseOpendistroPolicyIdSetting to policyID::equals,
explainResponseOpenSearchPolicyIdSetting to policyID::equals,
ManagedIndexMetaData.INDEX to managedIndexConfig.index::equals,
ManagedIndexMetaData.INDEX_UUID to managedIndexConfig.indexUuid::equals,
ManagedIndexMetaData.POLICY_ID to managedIndexConfig.policyID::equals,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() {
assertPredicatesOnMetaData(
listOf(
index to listOf(
ManagedIndexSettings.POLICY_ID.key to fun(policyID: Any?): Boolean =
explainResponseOpendistroPolicyIdSetting to fun(policyID: Any?): Boolean =
policyID == null,
explainResponseOpenSearchPolicyIdSetting to fun(policyID: Any?): Boolean =
policyID == null
)
),
Expand Down Expand Up @@ -269,7 +271,9 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() {
}

// TODO seen version conflict flaky failure here
// could be same reason as the test failure in ChangePolicyActionIT
logger.info("Config we use on update: $enabledManagedIndexConfig")
logger.info("Latest config: ${getExistingManagedIndexConfig(indexName)}")
// seems the config from above waitFor, after that, config got updated again?
updateManagedIndexConfigStartTime(enabledManagedIndexConfig)

waitFor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ import org.mockito.Mockito
import org.opensearch.Version
import org.opensearch.client.Client
import org.opensearch.cluster.OpenSearchAllocationTestCase
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID
import org.opensearch.cluster.node.DiscoveryNode
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.ClusterSettings
Expand Down Expand Up @@ -139,18 +137,6 @@ class ManagedIndexCoordinatorTests : OpenSearchAllocationTestCase() {
Mockito.verify(threadPool, Mockito.times(2)).scheduleWithFixedDelay(Mockito.any(), Mockito.any(), Mockito.anyString())
}

private fun createIndexMetaData(indexName: String, replicaNumber: Int, shardNumber: Int, policyID: String?): IndexMetadata.Builder {
val defaultSettings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(ManagedIndexSettings.POLICY_ID.key, policyID)
.put(SETTING_INDEX_UUID, randomAlphaOfLength(20))
.build()
return IndexMetadata.Builder(indexName)
.settings(defaultSettings)
.numberOfReplicas(replicaNumber)
.numberOfShards(shardNumber)
}

private fun <T> any(): T {
Mockito.any<T>()
return uninitialized()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.State
import org.opensearch.indexmanagement.indexstatemanagement.model.action.ReadOnlyActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification
import org.opensearch.indexmanagement.indexstatemanagement.randomPolicy
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN
import org.opensearch.indexmanagement.randomInstant
import org.opensearch.indexmanagement.waitFor
Expand Down Expand Up @@ -135,15 +134,25 @@ class ISMTemplateRestAPIIT : IndexStateManagementRestTestCase() {

// only index create after template can be managed
assertPredicatesOnMetaData(
listOf(indexName1 to listOf(ManagedIndexSettings.POLICY_ID.key to fun(policyID: Any?): Boolean = policyID == null)),
listOf(
indexName1 to listOf(
explainResponseOpendistroPolicyIdSetting to fun(policyID: Any?): Boolean = policyID == null,
explainResponseOpenSearchPolicyIdSetting to fun(policyID: Any?): Boolean = policyID == null
)
),
getExplainMap(indexName1),
true
)
assertNull(getManagedIndexConfig(indexName1))

// hidden index will not be manage
assertPredicatesOnMetaData(
listOf(indexName1 to listOf(ManagedIndexSettings.POLICY_ID.key to fun(policyID: Any?): Boolean = policyID == null)),
listOf(
indexName1 to listOf(
explainResponseOpendistroPolicyIdSetting to fun(policyID: Any?): Boolean = policyID == null,
explainResponseOpenSearchPolicyIdSetting to fun(policyID: Any?): Boolean = policyID == null
)
),
getExplainMap(indexName1),
true
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.resthandler
import org.junit.Before
import org.opensearch.client.ResponseException
import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase
import org.opensearch.indexmanagement.indexstatemanagement.model.ChangePolicy
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
Expand All @@ -46,7 +47,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.randomPolicy
import org.opensearch.indexmanagement.indexstatemanagement.randomReplicaCountActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.randomState
import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestChangePolicyAction.Companion.INDEX_NOT_MANAGED
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import org.opensearch.indexmanagement.indexstatemanagement.step.rollover.AttemptRolloverStep
import org.opensearch.indexmanagement.indexstatemanagement.util.FAILED_INDICES
import org.opensearch.indexmanagement.indexstatemanagement.util.FAILURES
Expand Down Expand Up @@ -108,6 +108,7 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() {
}

fun `test nonexistent ism config index`() {
if (indexExists(INDEX_MANAGEMENT_INDEX)) deleteIndex(INDEX_MANAGEMENT_INDEX)
try {
val changePolicy = ChangePolicy("some_id", null, emptyList(), false)
client().makeRequest(
Expand Down Expand Up @@ -340,7 +341,8 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() {
assertPredicatesOnMetaData(
listOf(
index to listOf(
ManagedIndexSettings.POLICY_ID.key to policy.id::equals,
explainResponseOpendistroPolicyIdSetting to policy.id::equals,
explainResponseOpenSearchPolicyIdSetting to policy.id::equals,
ManagedIndexMetaData.INDEX to executedManagedIndexConfig.index::equals,
ManagedIndexMetaData.INDEX_UUID to executedManagedIndexConfig.indexUuid::equals,
ManagedIndexMetaData.POLICY_ID to executedManagedIndexConfig.policyID::equals,
Expand Down Expand Up @@ -384,7 +386,8 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() {
assertPredicatesOnMetaData(
listOf(
index to listOf(
ManagedIndexSettings.POLICY_ID.key to policy.id::equals,
explainResponseOpendistroPolicyIdSetting to policy.id::equals,
explainResponseOpenSearchPolicyIdSetting to policy.id::equals,
ManagedIndexMetaData.INDEX to executedManagedIndexConfig.index::equals,
ManagedIndexMetaData.INDEX_UUID to executedManagedIndexConfig.indexUuid::equals,
ManagedIndexMetaData.POLICY_ID to executedManagedIndexConfig.policyID::equals,
Expand Down Expand Up @@ -422,7 +425,8 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() {
assertPredicatesOnMetaData(
listOf(
index to listOf(
ManagedIndexSettings.POLICY_ID.key to newPolicy.id::equals,
explainResponseOpendistroPolicyIdSetting to newPolicy.id::equals,
explainResponseOpenSearchPolicyIdSetting to newPolicy.id::equals,
ManagedIndexMetaData.INDEX to changedManagedIndexConfig.index::equals,
ManagedIndexMetaData.INDEX_UUID to changedManagedIndexConfig.indexUuid::equals,
ManagedIndexMetaData.POLICY_ID to changedManagedIndexConfig.policyID::equals,
Expand Down Expand Up @@ -466,11 +470,15 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() {
// speed up to third execution where we transition to second state
updateManagedIndexConfigStartTime(firstManagedIndexConfig)

logger.info("time before check")
waitFor {
getExplainManagedIndexMetaData(firstIndex).let {
assertEquals(it.copy(stateMetaData = it.stateMetaData?.copy(name = secondState.name)), it)
}
// getExplainManagedIndexMetaData(firstIndex).let {
// assertEquals(it.copy(stateMetaData = it.stateMetaData?.copy(name = secondState.name)), it)
// }
assertEquals(secondState.name, getExplainManagedIndexMetaData(firstIndex).stateMetaData?.name)
logger.info("Explain firstIndex before change policy: ${getExplainManagedIndexMetaData(firstIndex)}")
}
logger.info("time after check")

// create second index
val (secondIndex) = createIndex("second_index", policy.id)
Expand All @@ -493,7 +501,10 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() {
FAILED_INDICES to emptyList<Any>(),
UPDATED_INDICES to 1
)
assertAffectedIndicesResponseIsEqual(expectedResponse, response.asMap())
// TODO flaky part, log for more info
val responseMap = response.asMap()
logger.info("Change policy response: $responseMap")
assertAffectedIndicesResponseIsEqual(expectedResponse, responseMap)

waitFor {
// The first managed index should not have a change policy added to it as it should of been filtered out from the states filter
Expand Down
Loading

0 comments on commit 1860f38

Please sign in to comment.