Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making snapshot name to scripted input in template #77

Merged
merged 5 commits into from
Oct 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.action.Snapshot
import org.opensearch.indexmanagement.indexstatemanagement.step.Step
import org.opensearch.indexmanagement.indexstatemanagement.step.snapshot.AttemptSnapshotStep
import org.opensearch.indexmanagement.indexstatemanagement.step.snapshot.WaitForSnapshotStep
import org.opensearch.script.ScriptService

class SnapshotAction(
clusterService: ClusterService,
scriptService: ScriptService,
client: Client,
managedIndexMetaData: ManagedIndexMetaData,
config: SnapshotActionConfig
) : Action(ActionType.SNAPSHOT, config, managedIndexMetaData) {
private val attemptSnapshotStep = AttemptSnapshotStep(clusterService, client, config, managedIndexMetaData)
private val attemptSnapshotStep = AttemptSnapshotStep(clusterService, scriptService, client, config, managedIndexMetaData)
private val waitForSnapshotStep = WaitForSnapshotStep(clusterService, client, config, managedIndexMetaData)

override fun getSteps(): List<Step> = listOf(attemptSnapshotStep, waitForSnapshotStep)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ data class SnapshotActionConfig(
client: Client,
settings: Settings,
managedIndexMetaData: ManagedIndexMetaData
): Action = SnapshotAction(clusterService, client, managedIndexMetaData, this)
): Action = SnapshotAction(clusterService, scriptService, client, managedIndexMetaData, this)

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,13 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmet
import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StepMetaData
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST
import org.opensearch.indexmanagement.indexstatemanagement.step.Step
import org.opensearch.indexmanagement.opensearchapi.convertToMap
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.rest.RestStatus
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.ScriptType
import org.opensearch.script.TemplateScript
import org.opensearch.snapshots.ConcurrentSnapshotExecutionException
import org.opensearch.transport.RemoteTransportException
import java.time.LocalDateTime
Expand All @@ -50,6 +55,7 @@ import java.util.Locale

class AttemptSnapshotStep(
val clusterService: ClusterService,
val scriptService: ScriptService,
val client: Client,
val config: SnapshotActionConfig,
managedIndexMetaData: ManagedIndexMetaData
Expand All @@ -74,15 +80,15 @@ class AttemptSnapshotStep(
info = mutableInfo.toMap()
return this
}
val snapshotNameSuffix = "-".plus(
LocalDateTime.now(ZoneId.of("UTC"))
.format(DateTimeFormatter.ofPattern("uuuu.MM.dd-HH:mm:ss.SSS", Locale.ROOT))
)

snapshotName = config
.snapshot
.plus("-")
.plus(
LocalDateTime
.now(ZoneId.of("UTC"))
.format(DateTimeFormatter.ofPattern("uuuu.MM.dd-HH:mm:ss.SSS", Locale.ROOT))
)
val snapshotScript = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, config.snapshot, mapOf())
// If user intentionally set the snapshot name empty then we are going to honor it
val defaultSnapshotName = if (config.snapshot.isBlank()) config.snapshot else indexName
snapshotName = compileTemplate(snapshotScript, managedIndexMetaData, defaultSnapshotName).plus(snapshotNameSuffix)

val createSnapshotRequest = CreateSnapshotRequest()
.userMetadata(mapOf("snapshot_created" to "Open Distro for Elasticsearch Index Management"))
Expand Down Expand Up @@ -148,6 +154,16 @@ class AttemptSnapshotStep(
info = mutableInfo.toMap()
}

private fun compileTemplate(template: Script, managedIndexMetaData: ManagedIndexMetaData, defaultValue: String): String {
val contextMap = managedIndexMetaData.convertToMap().filterKeys { key ->
key in validTopContextFields
}
val compiledValue = scriptService.compile(template, TemplateScript.CONTEXT)
.newInstance(template.params + mapOf("ctx" to contextMap))
.execute()
return if (compiledValue.isBlank()) defaultValue else compiledValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does compiledValue.isBlank() occur?

Copy link
Contributor Author

@thalurur thalurur Oct 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if someone puts in a template with just a replaceable field and that is not valid e.g. {{ctx.name}} or {{ctx.anything_not_allowed}} this will be empty string

If someone specifies the template as {{ctx.anything_not_allowed}}_snapshot then the compiled value is just _snapshot

If someone specifies the template as {ctx.anything_not_allowed} since this is not a valid template placeholder it will be compiled as {ctx.anything_not_allowed}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm can someone specify an empty string as the snapshot name currently or do we block that?
i.e. if currently sameone has "" and it's allowed then instead of "" + the postfix stuff it'll change to $indexName + postfix stuff (since it looks like we add the index name as the default to use if this returns blank)

Copy link
Contributor Author

@thalurur thalurur Oct 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, there is no check for it being non empty at the moment the field needs to be present in the policy but it can be empty string.

I can update the default value to empty string in that case - i.e if the original name is empty string

}

override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
val currentActionMetaData = currentMetaData.actionMetaData
return currentMetaData.copy(
Expand All @@ -159,6 +175,7 @@ class AttemptSnapshotStep(
}

companion object {
val validTopContextFields = setOf("index", "indexUuid")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm is this all users have asked for? (Access to index/indexUuid in the snapshot name)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the users only asked for indexName in the issue, I have added indexUuid too since its readily available

const val name = "attempt_snapshot"
fun getBlockedMessage(denyList: List<String>, repoName: String, index: String) =
"Snapshot repository [$repoName] is blocked in $denyList [index=$index]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,13 +697,13 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
protected fun assertSnapshotExists(
repository: String,
snapshot: String
) = require(getSnapshotsList(repository).any { element -> (element as Map<String, String>)["id"]!!.contains(snapshot) }) { "No snapshot found with id: $snapshot" }
) = require(getSnapshotsList(repository).any { element -> (element as Map<String, String>)["id"]!!.startsWith(snapshot) }) { "No snapshot found with id: $snapshot" }

@Suppress("UNCHECKED_CAST")
protected fun assertSnapshotFinishedWithSuccess(
repository: String,
snapshot: String
) = require(getSnapshotsList(repository).any { element -> (element as Map<String, String>)["id"]!!.contains(snapshot) && "SUCCESS" == element["status"] }) { "Snapshot didn't finish with success." }
) = require(getSnapshotsList(repository).any { element -> (element as Map<String, String>)["id"]!!.startsWith(snapshot) && "SUCCESS" == element["status"] }) { "Snapshot didn't finish with success." }

/**
* Compares responses returned by APIs such as those defined in [RetryFailedManagedIndexAction] and [RestAddPolicyAction]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,82 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() {
// Need to wait two cycles for wait for snapshot step
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertSnapshotExists(repository, snapshot) }
waitFor { assertSnapshotFinishedWithSuccess(repository, snapshot) }
waitFor { assertSnapshotExists(repository, "snapshot") }
waitFor { assertSnapshotFinishedWithSuccess(repository, "snapshot") }
}

fun `test basic with templated snapshot name`() {
val indexName = "${testIndexName}_index_basic"
val policyID = "${testIndexName}_policy_basic"
val repository = "repository"
val actionConfig = SnapshotActionConfig(repository, "{{ctx.index}}", 0)
val states = listOf(
State("Snapshot", listOf(actionConfig), listOf())
)

createRepository(repository)

val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)
createPolicy(policy, policyID)
createIndex(indexName, policyID)

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

// Change the start time so the job will trigger in 2 seconds.
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

// Need to wait two cycles for wait for snapshot step
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertSnapshotExists(repository, indexName) }
waitFor { assertSnapshotFinishedWithSuccess(repository, indexName) }
}

fun `test basic with invalid templated snapshot name default to indexName`() {
val indexName = "${testIndexName}_index_basic"
val policyID = "${testIndexName}_policy_basic"
val repository = "repository"
val actionConfig = SnapshotActionConfig(repository, "{{ctx.someField}}", 0)
val states = listOf(
State("Snapshot", listOf(actionConfig), listOf())
)

createRepository(repository)

val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)
createPolicy(policy, policyID)
createIndex(indexName, policyID)

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

// Change the start time so the job will trigger in 2 seconds.
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

// Need to wait two cycles for wait for snapshot step
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertSnapshotExists(repository, indexName) }
waitFor { assertSnapshotFinishedWithSuccess(repository, indexName) }
}

fun `test successful wait for snapshot step`() {
Expand Down Expand Up @@ -130,6 +204,55 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() {
waitFor { assertSnapshotFinishedWithSuccess(repository, snapshot) }
}

fun `test successful wait for snapshot step - empty snapshot name`() {
val indexName = "${testIndexName}_index_success"
val policyID = "${testIndexName}_policy_success"
val repository = "repository"
val snapshot = "-"
val actionConfig = SnapshotActionConfig(repository, "", 0)
val states = listOf(
State("Snapshot", listOf(actionConfig), listOf())
)

createRepository(repository)

val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)
createPolicy(policy, policyID)
createIndex(indexName, policyID)

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

// Change the start time so the job will initialize the policy
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

// Change the start time so attempt snapshot step with execute
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals(AttemptSnapshotStep.getSuccessMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) }

// Change the start time so wait for snapshot step will execute
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals(WaitForSnapshotStep.getSuccessMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) }

// verify we set snapshotName in action properties
waitFor {
assert(
getExplainManagedIndexMetaData(indexName).actionMetaData?.actionProperties?.snapshotName?.contains(snapshot) == true
)
}

waitFor { assertSnapshotExists(repository, snapshot) }
waitFor { assertSnapshotFinishedWithSuccess(repository, snapshot) }
}

fun `test failed wait for snapshot step`() {
val indexName = "${testIndexName}_index_failed"
val policyID = "${testIndexName}_policy_failed"
Expand Down Expand Up @@ -188,8 +311,7 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() {
val indexName = "${testIndexName}_index_blocked"
val policyID = "${testIndexName}_policy_basic"
val repository = "hello-world"
val snapshot = "snapshot"
val actionConfig = SnapshotActionConfig(repository, snapshot, 0)
val actionConfig = SnapshotActionConfig(repository, "snapshot", 0)
val states = listOf(
State("Snapshot", listOf(actionConfig), listOf())
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.step
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.doAnswer
import com.nhaarman.mockitokotlin2.doReturn
import com.nhaarman.mockitokotlin2.eq
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.whenever
import kotlinx.coroutines.runBlocking
Expand All @@ -27,25 +28,30 @@ import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.ClusterSettings
import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.indexstatemanagement.model.action.SnapshotActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.ActionMetaData
import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.ActionProperties
import org.opensearch.indexmanagement.indexstatemanagement.randomSnapshotActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST
import org.opensearch.indexmanagement.indexstatemanagement.step.snapshot.AttemptSnapshotStep
import org.opensearch.ingest.TestTemplateService.MockTemplateScript
import org.opensearch.rest.RestStatus
import org.opensearch.script.ScriptService
import org.opensearch.script.TemplateScript
import org.opensearch.snapshots.ConcurrentSnapshotExecutionException
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.transport.RemoteTransportException

class AttemptSnapshotStepTests : OpenSearchTestCase() {

private val clusterService: ClusterService = mock()
private val config = SnapshotActionConfig("repo", "snapshot-name", 0)
private val scriptService: ScriptService = mock()
private val config = randomSnapshotActionConfig("repo", "snapshot-name")
private val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, ActionMetaData(AttemptSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null)

@Before
fun settings() {
whenever(clusterService.clusterSettings).doReturn(ClusterSettings(Settings.EMPTY, setOf(SNAPSHOT_DENY_LIST)))
whenever(scriptService.compile(any(), eq(TemplateScript.CONTEXT))).doReturn(MockTemplateScript.Factory("snapshot-name"))
}

fun `test snapshot response when block`() {
Expand All @@ -54,23 +60,23 @@ class AttemptSnapshotStepTests : OpenSearchTestCase() {

whenever(response.status()).doReturn(RestStatus.ACCEPTED)
runBlocking {
val step = AttemptSnapshotStep(clusterService, client, config, metadata)
val step = AttemptSnapshotStep(clusterService, scriptService, client, config, metadata)
step.execute()
val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetaData(metadata)
assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}

whenever(response.status()).doReturn(RestStatus.OK)
runBlocking {
val step = AttemptSnapshotStep(clusterService, client, config, metadata)
val step = AttemptSnapshotStep(clusterService, scriptService, client, config, metadata)
step.execute()
val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetaData(metadata)
assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
}

whenever(response.status()).doReturn(RestStatus.INTERNAL_SERVER_ERROR)
runBlocking {
val step = AttemptSnapshotStep(clusterService, client, config, metadata)
val step = AttemptSnapshotStep(clusterService, scriptService, client, config, metadata)
step.execute()
val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetaData(metadata)
assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
Expand All @@ -81,7 +87,7 @@ class AttemptSnapshotStepTests : OpenSearchTestCase() {
val exception = IllegalArgumentException("example")
val client = getClient(getAdminClient(getClusterAdminClient(null, exception)))
runBlocking {
val step = AttemptSnapshotStep(clusterService, client, config, metadata)
val step = AttemptSnapshotStep(clusterService, scriptService, client, config, metadata)
step.execute()
val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetaData(metadata)
assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
Expand All @@ -93,7 +99,7 @@ class AttemptSnapshotStepTests : OpenSearchTestCase() {
val exception = ConcurrentSnapshotExecutionException("repo", "other-snapshot", "concurrent snapshot in progress")
val client = getClient(getAdminClient(getClusterAdminClient(null, exception)))
runBlocking {
val step = AttemptSnapshotStep(clusterService, client, config, metadata)
val step = AttemptSnapshotStep(clusterService, scriptService, client, config, metadata)
step.execute()
val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetaData(metadata)
assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
Expand All @@ -105,7 +111,7 @@ class AttemptSnapshotStepTests : OpenSearchTestCase() {
val exception = RemoteTransportException("rte", ConcurrentSnapshotExecutionException("repo", "other-snapshot", "concurrent snapshot in progress"))
val client = getClient(getAdminClient(getClusterAdminClient(null, exception)))
runBlocking {
val step = AttemptSnapshotStep(clusterService, client, config, metadata)
val step = AttemptSnapshotStep(clusterService, scriptService, client, config, metadata)
step.execute()
val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetaData(metadata)
assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
Expand All @@ -117,7 +123,7 @@ class AttemptSnapshotStepTests : OpenSearchTestCase() {
val exception = RemoteTransportException("rte", IllegalArgumentException("some error"))
val client = getClient(getAdminClient(getClusterAdminClient(null, exception)))
runBlocking {
val step = AttemptSnapshotStep(clusterService, client, config, metadata)
val step = AttemptSnapshotStep(clusterService, scriptService, client, config, metadata)
step.execute()
val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetaData(metadata)
assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
Expand Down
Loading