Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Snapshot implementation #135

Merged
merged 11 commits into from
May 14, 2020
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.amazon.opendistroforelasticsearch.indexstatemanagement.action
dbbaughe marked this conversation as resolved.
Show resolved Hide resolved

import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ActionConfig.ActionType
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.SnapshotActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.snapshot.AttemptSnapshotStep
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.snapshot.WaitForSnapshotStep
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService

class SnapshotAction(
clusterService: ClusterService,
client: Client,
managedIndexMetaData: ManagedIndexMetaData,
config: SnapshotActionConfig
) : Action(ActionType.SNAPSHOT, config, managedIndexMetaData) {
private val attemptSnapshotStep = AttemptSnapshotStep(clusterService, client, config, managedIndexMetaData)
private val waitForSnapshotStep = WaitForSnapshotStep(clusterService, client, config, managedIndexMetaData)

private val stepNameToStep: LinkedHashMap<String, Step> = linkedMapOf(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is LinkedHashMap needed here? If going off the Force Merge action it seems like that was done for easier use w/ a when statement in the getStepToExecute. Since that's not done here we can probably just have a private steps list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

AttemptSnapshotStep.name to attemptSnapshotStep,
WaitForSnapshotStep.name to waitForSnapshotStep
)

override fun getSteps(): List<Step> = stepNameToStep.values.toList()

@Suppress("ReturnCount")
override fun getStepToExecute(): Step {
dbbaughe marked this conversation as resolved.
Show resolved Hide resolved
// If stepMetaData is null, return the first step
val stepMetaData = managedIndexMetaData.stepMetaData ?: return attemptSnapshotStep
if (stepMetaData.name == AttemptSnapshotStep.name) return waitForSnapshotStep

return attemptSnapshotStep
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ abstract class ActionConfig(
READ_WRITE("read_write"),
REPLICA_COUNT("replica_count"),
FORCE_MERGE("force_merge"),
NOTIFICATION("notification");
NOTIFICATION("notification"),
SNAPSHOT("snapshot");

override fun toString(): String {
return type
Expand Down Expand Up @@ -94,6 +95,7 @@ abstract class ActionConfig(
ActionType.REPLICA_COUNT.type -> actionConfig = ReplicaCountActionConfig.parse(xcp, index)
ActionType.FORCE_MERGE.type -> actionConfig = ForceMergeActionConfig.parse(xcp, index)
ActionType.NOTIFICATION.type -> actionConfig = NotificationActionConfig.parse(xcp, index)
ActionType.SNAPSHOT.type -> actionConfig = SnapshotActionConfig.parse(xcp, index)
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in Action.")
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action
dbbaughe marked this conversation as resolved.
Show resolved Hide resolved

import com.amazon.opendistroforelasticsearch.indexstatemanagement.action.Action
import com.amazon.opendistroforelasticsearch.indexstatemanagement.action.SnapshotAction
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.ToXContentObject
import org.elasticsearch.common.xcontent.XContentBuilder
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentParserUtils
import org.elasticsearch.script.ScriptService
import java.io.IOException

data class SnapshotActionConfig(
val repository: String?,
val snapshot: String?,
val includeGlobalState: Boolean?,
val index: Int
) : ToXContentObject, ActionConfig(ActionType.SNAPSHOT, index) {

init {
require(repository != null) { "SnapshotActionConfig repository must be specified" }
require(snapshot != null) { "SnapshotActionConfig snapshot must be specified" }
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
super.toXContent(builder, params)
.startObject(ActionType.SNAPSHOT.type)
if (repository != null) builder.field(REPOSITORY_FIELD, repository)
if (snapshot != null) builder.field(SNAPSHOT_FIELD, snapshot)
if (includeGlobalState != null) builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState)
return builder.endObject().endObject()
}

override fun isFragment(): Boolean = super<ToXContentObject>.isFragment()

override fun toAction(
clusterService: ClusterService,
scriptService: ScriptService,
client: Client,
managedIndexMetaData: ManagedIndexMetaData
): Action = SnapshotAction(clusterService, client, managedIndexMetaData, this)

companion object {
const val REPOSITORY_FIELD = "repository"
const val SNAPSHOT_FIELD = "snapshot"
const val INCLUDE_GLOBAL_STATE = "include_global_state"

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser, index: Int): SnapshotActionConfig {
var repository: String? = null
var snapshot: String? = null
var includeGlobalState: Boolean? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
REPOSITORY_FIELD -> repository = xcp.text()
SNAPSHOT_FIELD -> snapshot = xcp.text()
INCLUDE_GLOBAL_STATE -> includeGlobalState = xcp.booleanValue()
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in SnapshotActionConfig.")
}
}

return SnapshotActionConfig(
repository = repository,
snapshot = snapshot,
includeGlobalState = includeGlobalState,
index = index
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.amazon.opendistroforelasticsearch.indexstatemanagement.step.snapshot
dbbaughe marked this conversation as resolved.
Show resolved Hide resolved

import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.SnapshotActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.rest.RestStatus

class AttemptSnapshotStep(
val clusterService: ClusterService,
val client: Client,
val config: SnapshotActionConfig,
managedIndexMetaData: ManagedIndexMetaData
) : Step(name, managedIndexMetaData) {

private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
try {
logger.info("Executing snapshot on ${managedIndexMetaData.index}")
val createSnapshotRequest = CreateSnapshotRequest()
dbbaughe marked this conversation as resolved.
Show resolved Hide resolved
.indices(managedIndexMetaData.index)
.snapshot(config.snapshot)
dbbaughe marked this conversation as resolved.
Show resolved Hide resolved
.repository(config.repository)
dbbaughe marked this conversation as resolved.
Show resolved Hide resolved
if (config.includeGlobalState != null) {
createSnapshotRequest.includeGlobalState(config.includeGlobalState)
dbbaughe marked this conversation as resolved.
Show resolved Hide resolved
}

val response: CreateSnapshotResponse = client.admin().cluster().suspendUntil { createSnapshot(createSnapshotRequest, it) }
dbbaughe marked this conversation as resolved.
Show resolved Hide resolved
when (response.status()) {
RestStatus.ACCEPTED -> {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to "Creating snapshot in progress for index: ${managedIndexMetaData.index}")
}
RestStatus.OK -> {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to "Snapshot created for index: ${managedIndexMetaData.index}")
}
else -> {
stepStatus = StepStatus.FAILED
info = mapOf("message" to "There was an error during snapshot creation for index: ${managedIndexMetaData.index}")
}
}
} catch (e: Exception) {
dbbaughe marked this conversation as resolved.
Show resolved Hide resolved
val message = "Failed to create snapshot for index: ${managedIndexMetaData.index}"
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}
}

override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetaData.copy(
stepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), stepStatus),
transitionTo = null,
info = info
)
}

companion object {
const val name = "attempt_snapshot"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.amazon.opendistroforelasticsearch.indexstatemanagement.step.snapshot
dbbaughe marked this conversation as resolved.
Show resolved Hide resolved

import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.SnapshotActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService

class WaitForSnapshotStep(
val clusterService: ClusterService,
val client: Client,
val config: SnapshotActionConfig,
managedIndexMetaData: ManagedIndexMetaData
) : Step(name, managedIndexMetaData) {
private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override suspend fun execute() {
logger.info("Waiting for snapshot to complete...")
val request = SnapshotsStatusRequest()
.snapshots(arrayOf(config.snapshot))
.repository(config.repository)
val response: SnapshotsStatusResponse = client.admin().cluster().suspendUntil { snapshotsStatus(request, it) }
val status: SnapshotStatus? = response
.snapshots
.find { snapshotStatus ->
snapshotStatus.snapshot.snapshotId.name == config.snapshot && snapshotStatus.snapshot.repository == config.repository
}
if (status != null) {
if (status.state.completed()) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to "Snapshot created for index: ${managedIndexMetaData.index}")
} else {
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to "Creating snapshot in progress for index: ${managedIndexMetaData.index}")
}
} else {
stepStatus = StepStatus.FAILED
info = mapOf("message" to "Snapshot doesn't exist for index: ${managedIndexMetaData.index}")
}
}

override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetaData.copy(
stepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), stepStatus),
transitionTo = null,
info = info
)
}

companion object {
const val name = "wait_for_snapshot"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ import org.apache.http.HttpHeaders
import org.apache.http.entity.ContentType.APPLICATION_JSON
import org.apache.http.entity.StringEntity
import org.apache.http.message.BasicHeader
import org.elasticsearch.ElasticsearchParseException
import org.elasticsearch.action.search.SearchResponse
import org.elasticsearch.client.Request
import org.elasticsearch.client.Response
import org.elasticsearch.cluster.metadata.IndexMetaData
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.common.xcontent.DeprecationHandler
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
import org.elasticsearch.common.xcontent.NamedXContentRegistry
import org.elasticsearch.common.xcontent.XContentParser.Token
Expand All @@ -60,6 +62,7 @@ import org.elasticsearch.rest.RestStatus
import org.elasticsearch.test.ESTestCase
import org.elasticsearch.test.rest.ESRestTestCase
import org.junit.rules.DisableOnDebug
import java.io.IOException
import java.time.Duration
import java.time.Instant
import java.util.Locale
Expand Down Expand Up @@ -409,6 +412,59 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() {
return metadata
}

protected fun createRepository(
repository: String
) {
val path = getRepoPath()
val response = client()
.makeRequest(
"PUT",
"_snapshot/$repository",
emptyMap(),
StringEntity("{\"type\":\"fs\", \"settings\": {\"location\": \"$path\"}}", APPLICATION_JSON)
)
assertEquals("Unable to create a new repository", RestStatus.OK, response.restStatus())
}

@Suppress("UNCHECKED_CAST")
private fun getRepoPath(): String {
val response = client()
.makeRequest(
"GET",
"_nodes",
emptyMap()
)
assertEquals("Unable to get a nodes settings", RestStatus.OK, response.restStatus())
return ((response.asMap()["nodes"] as HashMap<String, HashMap<String, HashMap<String, HashMap<String, Any>>>>).values.first()["settings"]!!["path"]!!["repo"] as List<String>)[0]
}

private fun getSnapshotsList(repository: String): List<Any> {
val response = client()
.makeRequest(
"GET",
"_cat/snapshots/$repository?format=json",
emptyMap()
)
assertEquals("Unable to get a snapshot", RestStatus.OK, response.restStatus())
try {
return jsonXContent
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, response.entity.content)
.use { parser -> parser.list() }
} catch (e: IOException) {
throw ElasticsearchParseException("Failed to parse content to list", e)
}
}

protected fun assertSnapshotExists(
repository: String,
snapshot: String
) = require(getSnapshotsList(repository).any { element -> snapshot == (element as Map<String, String>)["id"] }) { "No snapshot found with id: $snapshot" }

protected fun assertSnapshotFinishedWithSuccess(
repository: String,
snapshot: String
) = require(getSnapshotsList(repository).any { element -> snapshot == (element as Map<String, String>)["id"] && "SUCCESS" == element["status"] }) { "Snapshot didn't finish with success." }

/**
* Compares responses returned by APIs such as those defined in [RetryFailedManagedIndexAction] and [RestAddPolicyAction]
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.R
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ReadWriteActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ReplicaCountActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.RolloverActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.SnapshotActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.coordinator.ClusterStateManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.coordinator.SweptManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.destination.Chime
Expand Down Expand Up @@ -196,6 +197,10 @@ fun randomTemplateScript(
params: Map<String, String> = emptyMap()
): Script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, source, params)

fun randomSnapshotActionConfig(repository: String? = null, snapshot: String? = null, includeGlobalState: Boolean? = null): SnapshotActionConfig {
return SnapshotActionConfig(repository, snapshot, includeGlobalState, index = 0)
}

/**
* Helper functions for creating a random Conditions object
*/
Expand Down Expand Up @@ -360,6 +365,11 @@ fun ManagedIndexConfig.toJsonString(): String {
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
}

fun SnapshotActionConfig.toJsonString(): String {
val builder = XContentFactory.jsonBuilder()
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
}

/**
* Wrapper for [RestClient.performRequest] which was deprecated in ES 6.5 and is used in tests. This provides
* a single place to suppress deprecation warnings. This will probably need further work when the API is removed entirely
Expand Down
Loading