Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.indexstatemanagement
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.indexmanagement.indexstatemanagement.action.AliasActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.AllocationActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.CloseActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteActionParser
Expand All @@ -34,6 +35,7 @@ class ISMActionsParser private constructor() {
}

val parsers = mutableListOf(
AliasActionParser(),
AllocationActionParser(),
CloseActionParser(),
DeleteActionParser(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.indexmanagement.indexstatemanagement.step.alias.AttemptAliasActionsStep
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext

class AliasAction(
val actions: List<IndicesAliasesRequest.AliasActions>,
index: Int
) : Action(name, index) {

/**
* Allowing the alias action to be only applicable on the managed index for ADD and REMOVE actions only.
* https://github.com/opensearch-project/OpenSearch/blob/4d045a164e12a382881140e32f9285a3224fecc7/server/src/main/java/org/opensearch/action/admin/indices/alias/IndicesAliasesRequest.java#L105
*/
init {
require(actions.isNotEmpty()) { "At least one alias action needs to be specified." }
val allowedActionTypes = listOf(IndicesAliasesRequest.AliasActions.Type.ADD, IndicesAliasesRequest.AliasActions.Type.REMOVE)
require(actions.all { it.actionType() in allowedActionTypes }) { "Only ADD and REMOVE actions are allowed." }
require(
actions.all { it.indices().isNullOrEmpty() }
) { "Alias action can only work on its applied index so don't accept index/indices parameter." }
require(
actions.all { it.aliases().isNotEmpty() }
) { "At least one alias needs to be specified." }
}

private val attemptAliasActionsStep = AttemptAliasActionsStep(this)

private val steps = listOf(attemptAliasActionsStep)

override fun getStepToExecute(context: StepContext): Step {
return attemptAliasActionsStep
}

override fun getSteps(): List<Step> = steps

override fun populateAction(builder: XContentBuilder, params: ToXContent.Params) {
builder.startObject(type)
builder.field(ACTIONS, actions)
builder.endObject()
}

override fun populateAction(out: StreamOutput) {
out.writeList(actions)
out.writeInt(actionIndex)
}

companion object {
const val name = "alias"
const val ACTIONS = "actions"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.apache.logging.log4j.LogManager
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.indexmanagement.indexstatemanagement.action.AliasAction.Companion.ACTIONS
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser

class AliasActionParser : ActionParser() {

private val logger = LogManager.getLogger(javaClass)
override fun fromStreamInput(sin: StreamInput): Action {
val actions = sin.readList(IndicesAliasesRequest::AliasActions)
val index = sin.readInt()
return AliasAction(actions, index)
}

override fun fromXContent(xcp: XContentParser, index: Int): Action {
val actions: MutableList<IndicesAliasesRequest.AliasActions> = mutableListOf()

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
when (fieldName) {
ACTIONS -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Block remove_index action
Block index indices fields if user provided

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added checks in initiate function to raise Illegal argument exception.

ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
actions.add(IndicesAliasesRequest.AliasActions.fromXContent(xcp))
}
}
else -> {
logger.error("Invalid field: [$fieldName] found in AliasAction.")
throw IllegalArgumentException("Invalid field: [$fieldName] found in AliasAction.")
}
}
}
return AliasAction(actions, index)
}

override fun getActionType(): String = AliasAction.name
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.step.alias

import org.apache.logging.log4j.LogManager
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.indexmanagement.indexstatemanagement.action.AliasAction
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData

class AttemptAliasActionsStep(private val action: AliasAction) : Step(name) {

private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override suspend fun execute(): Step {
val context = this.context ?: return this
val indexName = context.metadata.index
try {
val request = IndicesAliasesRequest()
action.actions.forEach {
// Applying the actions on the managed index.
it.indices(indexName)
request.addAliasAction(it)
}
val response: AcknowledgedResponse = context.client.admin().indices().suspendUntil { aliases(request, it) }
handleResponse(response, indexName, action.actions)
} catch (e: Exception) {
handleException(e, indexName, action.actions)
}
return this
}

private fun handleException(e: Exception, indexName: String, actions: List<IndicesAliasesRequest.AliasActions>) {
val message = getFailedMessage(indexName, actions)
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

private fun handleResponse(
response: AcknowledgedResponse,
indexName: String,
actions: List<IndicesAliasesRequest.AliasActions>
) {
if (response.isAcknowledged) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to getSuccessMessage(indexName))
} else {
stepStatus = StepStatus.FAILED
info = mapOf("message" to getFailedMessage(indexName, actions))
}
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetadata.copy(
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
transitionTo = null,
info = info
)
}

override fun isIdempotent() = true

companion object {
val validTopContextFields = setOf("index")
const val name = "attempt_alias"
fun getFailedMessage(
index: String,
actions: List<IndicesAliasesRequest.AliasActions>
) = "Failed to update alias [index=$index] for actions: [actions=$actions]"

fun getSuccessMessage(index: String) = "Successfully updated alias [index=$index]"
}
}
10 changes: 9 additions & 1 deletion src/main/resources/mappings/opendistro-ism-config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"_meta" : {
"schema_version": 16
"schema_version": 17
},
"dynamic": "strict",
"properties": {
Expand Down Expand Up @@ -159,6 +159,14 @@
}
}
},
"alias": {
"properties": {
"actions": {
"type": "object",
"enabled": false
}
}
},
"delete": {
"type": "object"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import javax.management.remote.JMXServiceURL

abstract class IndexManagementRestTestCase : ODFERestTestCase() {

val configSchemaVersion = 16
val configSchemaVersion = 17
val historySchemaVersion = 5

// Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
package org.opensearch.indexmanagement.indexstatemanagement

import org.opensearch.action.admin.indices.alias.Alias
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest
import org.opensearch.common.unit.ByteSizeValue
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.index.RandomCreateIndexGenerator.randomAlias
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.indexmanagement.indexstatemanagement.action.AliasAction
import org.opensearch.indexmanagement.indexstatemanagement.action.AllocationAction
import org.opensearch.indexmanagement.indexstatemanagement.action.CloseAction
import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteAction
Expand Down Expand Up @@ -206,6 +208,24 @@ fun randomOpenActionConfig(): OpenAction {
return OpenAction(index = 0)
}

fun randomAliasAction(includeIndices: Boolean = false): AliasAction {
val actions = List(OpenSearchRestTestCase.randomIntBetween(1, 10)) { if (includeIndices) randomAliasActionWithIndices() else randomAliasActions() }
return AliasAction(actions = actions, index = 0)
}

fun randomAliasActions(): IndicesAliasesRequest.AliasActions {
val types = listOf(IndicesAliasesRequest.AliasActions.Type.ADD, IndicesAliasesRequest.AliasActions.Type.REMOVE)
return IndicesAliasesRequest.AliasActions(OpenSearchRestTestCase.randomSubsetOf(1, types).first())
.alias(OpenSearchRestTestCase.randomAlphaOfLength(10))
}

fun randomAliasActionWithIndices(): IndicesAliasesRequest.AliasActions {
val types = listOf(IndicesAliasesRequest.AliasActions.Type.ADD, IndicesAliasesRequest.AliasActions.Type.REMOVE)
return IndicesAliasesRequest.AliasActions(OpenSearchRestTestCase.randomSubsetOf(1, types).first())
.alias(OpenSearchRestTestCase.randomAlphaOfLength(10))
.indices(OpenSearchRestTestCase.randomAlphaOfLength(10))
}

fun randomDestination(type: DestinationType = randomDestinationType()): Destination {
return Destination(
type = type,
Expand Down Expand Up @@ -478,6 +498,11 @@ fun OpenAction.toJsonString(): String {
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
}

fun AliasAction.toJsonString(): String {
val builder = XContentFactory.jsonBuilder()
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
}

fun ISMTemplate.toJsonString(): String {
val builder = XContentFactory.jsonBuilder()
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
Expand Down
Loading