Skip to content

Commit

Permalink
Making snapshot name to script template so snapshot name can be deriv…
Browse files Browse the repository at this point in the history
…ed from the index or any runtime attributes instead of being constant

Signed-off-by: Ravi Thaluru <ravi1092@gmail.com>
  • Loading branch information
thalurur committed Jun 10, 2021
1 parent 1e70b25 commit de21d80
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.action.Snapshot
import org.opensearch.indexmanagement.indexstatemanagement.step.Step
import org.opensearch.indexmanagement.indexstatemanagement.step.snapshot.AttemptSnapshotStep
import org.opensearch.indexmanagement.indexstatemanagement.step.snapshot.WaitForSnapshotStep
import org.opensearch.script.ScriptService

class SnapshotAction(
clusterService: ClusterService,
scriptService: ScriptService,
client: Client,
managedIndexMetaData: ManagedIndexMetaData,
config: SnapshotActionConfig
) : Action(ActionType.SNAPSHOT, config, managedIndexMetaData) {
private val attemptSnapshotStep = AttemptSnapshotStep(clusterService, client, config, managedIndexMetaData)
private val attemptSnapshotStep = AttemptSnapshotStep(clusterService, scriptService, client, config, managedIndexMetaData)
private val waitForSnapshotStep = WaitForSnapshotStep(clusterService, client, config, managedIndexMetaData)

override fun getSteps(): List<Step> = listOf(attemptSnapshotStep, waitForSnapshotStep)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.indexmanagement.indexstatemanagement.action.Action
import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotAction
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import java.io.IOException

data class SnapshotActionConfig(
val repository: String,
val snapshot: String,
val snapshot: Script,
val index: Int
) : ToXContentObject, ActionConfig(ActionType.SNAPSHOT, index) {

Expand All @@ -66,20 +67,20 @@ data class SnapshotActionConfig(
client: Client,
settings: Settings,
managedIndexMetaData: ManagedIndexMetaData
): Action = SnapshotAction(clusterService, client, managedIndexMetaData, this)
): Action = SnapshotAction(clusterService, scriptService, client, managedIndexMetaData, this)

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
repository = sin.readString(),
snapshot = sin.readString(),
snapshot = Script(sin),
index = sin.readInt()
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeString(repository)
out.writeString(snapshot)
snapshot.writeTo(out)
out.writeInt(index)
}

Expand All @@ -92,7 +93,7 @@ data class SnapshotActionConfig(
@Throws(IOException::class)
fun parse(xcp: XContentParser, index: Int): SnapshotActionConfig {
var repository: String? = null
var snapshot: String? = null
var snapshot: Script? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
Expand All @@ -101,7 +102,7 @@ data class SnapshotActionConfig(

when (fieldName) {
REPOSITORY_FIELD -> repository = xcp.text()
SNAPSHOT_FIELD -> snapshot = xcp.text()
SNAPSHOT_FIELD -> snapshot = Script.parse(xcp, Script.DEFAULT_TEMPLATE_LANG)
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in SnapshotActionConfig.")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,12 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmet
import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StepMetaData
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST
import org.opensearch.indexmanagement.indexstatemanagement.step.Step
import org.opensearch.indexmanagement.opensearchapi.convertToMap
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.rest.RestStatus
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.TemplateScript
import org.opensearch.snapshots.ConcurrentSnapshotExecutionException
import org.opensearch.transport.RemoteTransportException
import java.time.LocalDateTime
Expand All @@ -50,6 +54,7 @@ import java.util.Locale

class AttemptSnapshotStep(
val clusterService: ClusterService,
val scriptService: ScriptService,
val client: Client,
val config: SnapshotActionConfig,
managedIndexMetaData: ManagedIndexMetaData
Expand All @@ -74,15 +79,12 @@ class AttemptSnapshotStep(
info = mutableInfo.toMap()
return this
}
val snapshotNamePrefix = "-".plus(
LocalDateTime.now(ZoneId.of("UTC"))
.format(DateTimeFormatter.ofPattern("uuuu.MM.dd-HH:mm:ss.SSS", Locale.ROOT))
)

snapshotName = config
.snapshot
.plus("-")
.plus(
LocalDateTime
.now(ZoneId.of("UTC"))
.format(DateTimeFormatter.ofPattern("uuuu.MM.dd-HH:mm:ss.SSS", Locale.ROOT))
)
snapshotName = compileTemplate(config.snapshot, managedIndexMetaData, indexName).plus(snapshotNamePrefix)

val createSnapshotRequest = CreateSnapshotRequest()
.userMetadata(mapOf("snapshot_created" to "Open Distro for Elasticsearch Index Management"))
Expand Down Expand Up @@ -148,6 +150,16 @@ class AttemptSnapshotStep(
info = mutableInfo.toMap()
}

private fun compileTemplate(template: Script, managedIndexMetaData: ManagedIndexMetaData, defaultValue: String): String {
val contextMap = managedIndexMetaData.convertToMap().filterKeys { key ->
key in validTopContextFields
}
val compiledValue = scriptService.compile(template, TemplateScript.CONTEXT)
.newInstance(template.params + mapOf("ctx" to contextMap))
.execute()
return if (compiledValue.isBlank()) defaultValue else compiledValue
}

override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
val currentActionMetaData = currentMetaData.actionMetaData
return currentMetaData.copy(
Expand All @@ -159,6 +171,7 @@ class AttemptSnapshotStep(
}

companion object {
val validTopContextFields = setOf("index", "indexUuid")
const val name = "attempt_snapshot"
fun getBlockedMessage(denyList: List<String>, repoName: String, index: String) =
"Snapshot repository [$repoName] is blocked in $denyList [index=$index]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ fun randomTemplateScript(
): Script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, source, params)

fun randomSnapshotActionConfig(repository: String = "repo", snapshot: String = "sp"): SnapshotActionConfig {
return SnapshotActionConfig(repository, snapshot, index = 0)
return SnapshotActionConfig(repository, Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, snapshot, emptyMap()), index = 0)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndex
import org.opensearch.indexmanagement.indexstatemanagement.step.snapshot.AttemptSnapshotStep
import org.opensearch.indexmanagement.indexstatemanagement.step.snapshot.WaitForSnapshotStep
import org.opensearch.indexmanagement.waitFor
import org.opensearch.script.Script
import org.opensearch.script.ScriptType
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Locale
Expand All @@ -47,7 +49,8 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() {
val indexName = "${testIndexName}_index_basic"
val policyID = "${testIndexName}_policy_basic"
val repository = "repository"
val snapshot = "snapshot"
val snapshotText = "snapshot"
val snapshot = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, snapshotText, emptyMap())
val actionConfig = SnapshotActionConfig(repository, snapshot, 0)
val states = listOf(
State("Snapshot", listOf(actionConfig), listOf())
Expand Down Expand Up @@ -77,15 +80,92 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() {
// Need to wait two cycles for wait for snapshot step
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertSnapshotExists(repository, snapshot) }
waitFor { assertSnapshotFinishedWithSuccess(repository, snapshot) }
waitFor { assertSnapshotExists(repository, snapshotText) }
waitFor { assertSnapshotFinishedWithSuccess(repository, snapshotText) }
}

fun `test basic with templated snapshot name`() {
val indexName = "${testIndexName}_index_basic"
val policyID = "${testIndexName}_policy_basic"
val repository = "repository"
val snapshot = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, "{{ctx.index}}", emptyMap())
val actionConfig = SnapshotActionConfig(repository, snapshot, 0)
val states = listOf(
State("Snapshot", listOf(actionConfig), listOf())
)

createRepository(repository)

val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)
createPolicy(policy, policyID)
createIndex(indexName, policyID)

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

// Change the start time so the job will trigger in 2 seconds.
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

// Need to wait two cycles for wait for snapshot step
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertSnapshotExists(repository, indexName) }
waitFor { assertSnapshotFinishedWithSuccess(repository, indexName) }
}

fun `test basic with invalid templated snapshot name default to indexName`() {
val indexName = "${testIndexName}_index_basic"
val policyID = "${testIndexName}_policy_basic"
val repository = "repository"
val snapshot = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, "{{ctx.someField}}", emptyMap())
val actionConfig = SnapshotActionConfig(repository, snapshot, 0)
val states = listOf(
State("Snapshot", listOf(actionConfig), listOf())
)

createRepository(repository)

val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)
createPolicy(policy, policyID)
createIndex(indexName, policyID)

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

// Change the start time so the job will trigger in 2 seconds.
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

// Need to wait two cycles for wait for snapshot step
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertSnapshotExists(repository, indexName) }
waitFor { assertSnapshotFinishedWithSuccess(repository, indexName) }
}

fun `test successful wait for snapshot step`() {
val indexName = "${testIndexName}_index_success"
val policyID = "${testIndexName}_policy_success"
val repository = "repository"
val snapshot = "snapshot_success_test"
val snapshotText = "snapshot_success_test"
val snapshot = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, snapshotText, emptyMap())
val actionConfig = SnapshotActionConfig(repository, snapshot, 0)
val states = listOf(
State("Snapshot", listOf(actionConfig), listOf())
Expand Down Expand Up @@ -122,19 +202,20 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() {
// verify we set snapshotName in action properties
waitFor {
assert(
getExplainManagedIndexMetaData(indexName).actionMetaData?.actionProperties?.snapshotName?.contains(snapshot) == true
getExplainManagedIndexMetaData(indexName).actionMetaData?.actionProperties?.snapshotName?.contains(snapshotText) == true
)
}

waitFor { assertSnapshotExists(repository, snapshot) }
waitFor { assertSnapshotFinishedWithSuccess(repository, snapshot) }
waitFor { assertSnapshotExists(repository, snapshotText) }
waitFor { assertSnapshotFinishedWithSuccess(repository, snapshotText) }
}

fun `test failed wait for snapshot step`() {
val indexName = "${testIndexName}_index_failed"
val policyID = "${testIndexName}_policy_failed"
val repository = "repository"
val snapshot = "snapshot_failed_test"
val snapshotText = "snapshot_failed_test"
val snapshot = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, snapshotText, emptyMap())
val actionConfig = SnapshotActionConfig(repository, snapshot, 0)
val states = listOf(
State("Snapshot", listOf(actionConfig), listOf())
Expand Down Expand Up @@ -165,8 +246,8 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() {
waitFor { assertEquals(AttemptSnapshotStep.getSuccessMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) }

// Confirm successful snapshot creation
waitFor { assertSnapshotExists(repository, snapshot) }
waitFor { assertSnapshotFinishedWithSuccess(repository, snapshot) }
waitFor { assertSnapshotExists(repository, snapshotText) }
waitFor { assertSnapshotFinishedWithSuccess(repository, snapshotText) }

// Delete the snapshot so wait for step will fail with missing snapshot exception
val snapshotName = getExplainManagedIndexMetaData(indexName).actionMetaData?.actionProperties?.snapshotName
Expand All @@ -188,7 +269,7 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() {
val indexName = "${testIndexName}_index_blocked"
val policyID = "${testIndexName}_policy_basic"
val repository = "hello-world"
val snapshot = "snapshot"
val snapshot = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, "snapshot", emptyMap())
val actionConfig = SnapshotActionConfig(repository, snapshot, 0)
val states = listOf(
State("Snapshot", listOf(actionConfig), listOf())
Expand Down
Loading

0 comments on commit de21d80

Please sign in to comment.