Skip to content

Commit 2c8160a

Browse files
* Adds an alias action (opensearch-project#35) Signed-off-by: Megha Goyal <goyamegh@amazon.com> Co-authored-by: Megha Goyal <56077967+goyamegh@users.noreply.github.com>
1 parent 2ee5bec commit 2c8160a

File tree

11 files changed

+611
-3
lines changed

11 files changed

+611
-3
lines changed

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.indexstatemanagement
88
import org.opensearch.common.io.stream.StreamInput
99
import org.opensearch.common.xcontent.XContentParser
1010
import org.opensearch.common.xcontent.XContentParserUtils
11+
import org.opensearch.indexmanagement.indexstatemanagement.action.AliasActionParser
1112
import org.opensearch.indexmanagement.indexstatemanagement.action.AllocationActionParser
1213
import org.opensearch.indexmanagement.indexstatemanagement.action.CloseActionParser
1314
import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteActionParser
@@ -34,6 +35,7 @@ class ISMActionsParser private constructor() {
3435
}
3536

3637
val parsers = mutableListOf(
38+
AliasActionParser(),
3739
AllocationActionParser(),
3840
CloseActionParser(),
3941
DeleteActionParser(),
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.indexstatemanagement.action
7+
8+
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest
9+
import org.opensearch.common.io.stream.StreamOutput
10+
import org.opensearch.common.xcontent.ToXContent
11+
import org.opensearch.common.xcontent.XContentBuilder
12+
import org.opensearch.indexmanagement.indexstatemanagement.step.alias.AttemptAliasActionsStep
13+
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
14+
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
15+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
16+
17+
class AliasAction(
18+
val actions: List<IndicesAliasesRequest.AliasActions>,
19+
index: Int
20+
) : Action(name, index) {
21+
22+
/**
23+
* Allowing the alias action to be only applicable on the managed index for ADD and REMOVE actions only.
24+
* https://github.com/opensearch-project/OpenSearch/blob/4d045a164e12a382881140e32f9285a3224fecc7/server/src/main/java/org/opensearch/action/admin/indices/alias/IndicesAliasesRequest.java#L105
25+
*/
26+
init {
27+
require(actions.isNotEmpty()) { "At least one alias action needs to be specified." }
28+
val allowedActionTypes = listOf(IndicesAliasesRequest.AliasActions.Type.ADD, IndicesAliasesRequest.AliasActions.Type.REMOVE)
29+
require(actions.all { it.actionType() in allowedActionTypes }) { "Only ADD and REMOVE actions are allowed." }
30+
require(
31+
actions.all { it.indices().isNullOrEmpty() }
32+
) { "Alias action can only work on its applied index so don't accept index/indices parameter." }
33+
require(
34+
actions.all { it.aliases().isNotEmpty() }
35+
) { "At least one alias needs to be specified." }
36+
}
37+
38+
private val attemptAliasActionsStep = AttemptAliasActionsStep(this)
39+
40+
private val steps = listOf(attemptAliasActionsStep)
41+
42+
override fun getStepToExecute(context: StepContext): Step {
43+
return attemptAliasActionsStep
44+
}
45+
46+
override fun getSteps(): List<Step> = steps
47+
48+
override fun populateAction(builder: XContentBuilder, params: ToXContent.Params) {
49+
builder.startObject(type)
50+
builder.field(ACTIONS, actions)
51+
builder.endObject()
52+
}
53+
54+
override fun populateAction(out: StreamOutput) {
55+
out.writeList(actions)
56+
out.writeInt(actionIndex)
57+
}
58+
59+
companion object {
60+
const val name = "alias"
61+
const val ACTIONS = "actions"
62+
}
63+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.indexstatemanagement.action
7+
8+
import org.apache.logging.log4j.LogManager
9+
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest
10+
import org.opensearch.common.io.stream.StreamInput
11+
import org.opensearch.common.xcontent.XContentParser
12+
import org.opensearch.common.xcontent.XContentParser.Token
13+
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
14+
import org.opensearch.indexmanagement.indexstatemanagement.action.AliasAction.Companion.ACTIONS
15+
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
16+
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
17+
18+
class AliasActionParser : ActionParser() {
19+
20+
private val logger = LogManager.getLogger(javaClass)
21+
override fun fromStreamInput(sin: StreamInput): Action {
22+
val actions = sin.readList(IndicesAliasesRequest::AliasActions)
23+
val index = sin.readInt()
24+
return AliasAction(actions, index)
25+
}
26+
27+
override fun fromXContent(xcp: XContentParser, index: Int): Action {
28+
val actions: MutableList<IndicesAliasesRequest.AliasActions> = mutableListOf()
29+
30+
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
31+
while (xcp.nextToken() != Token.END_OBJECT) {
32+
val fieldName = xcp.currentName()
33+
xcp.nextToken()
34+
when (fieldName) {
35+
ACTIONS -> {
36+
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
37+
while (xcp.nextToken() != Token.END_ARRAY) {
38+
actions.add(IndicesAliasesRequest.AliasActions.fromXContent(xcp))
39+
}
40+
}
41+
else -> {
42+
logger.error("Invalid field: [$fieldName] found in AliasAction.")
43+
throw IllegalArgumentException("Invalid field: [$fieldName] found in AliasAction.")
44+
}
45+
}
46+
}
47+
return AliasAction(actions, index)
48+
}
49+
50+
override fun getActionType(): String = AliasAction.name
51+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.indexstatemanagement.step.alias
7+
8+
import org.apache.logging.log4j.LogManager
9+
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest
10+
import org.opensearch.action.support.master.AcknowledgedResponse
11+
import org.opensearch.indexmanagement.indexstatemanagement.action.AliasAction
12+
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
13+
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
14+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
15+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
16+
17+
class AttemptAliasActionsStep(private val action: AliasAction) : Step(name) {
18+
19+
private val logger = LogManager.getLogger(javaClass)
20+
private var stepStatus = StepStatus.STARTING
21+
private var info: Map<String, Any>? = null
22+
23+
override suspend fun execute(): Step {
24+
val context = this.context ?: return this
25+
val indexName = context.metadata.index
26+
try {
27+
val request = IndicesAliasesRequest()
28+
action.actions.forEach {
29+
// Applying the actions on the managed index.
30+
it.indices(indexName)
31+
request.addAliasAction(it)
32+
}
33+
val response: AcknowledgedResponse = context.client.admin().indices().suspendUntil { aliases(request, it) }
34+
handleResponse(response, indexName, action.actions)
35+
} catch (e: Exception) {
36+
handleException(e, indexName, action.actions)
37+
}
38+
return this
39+
}
40+
41+
private fun handleException(e: Exception, indexName: String, actions: List<IndicesAliasesRequest.AliasActions>) {
42+
val message = getFailedMessage(indexName, actions)
43+
logger.error(message, e)
44+
stepStatus = StepStatus.FAILED
45+
val mutableInfo = mutableMapOf("message" to message)
46+
val errorMessage = e.message
47+
if (errorMessage != null) mutableInfo["cause"] = errorMessage
48+
info = mutableInfo.toMap()
49+
}
50+
51+
private fun handleResponse(
52+
response: AcknowledgedResponse,
53+
indexName: String,
54+
actions: List<IndicesAliasesRequest.AliasActions>
55+
) {
56+
if (response.isAcknowledged) {
57+
stepStatus = StepStatus.COMPLETED
58+
info = mapOf("message" to getSuccessMessage(indexName))
59+
} else {
60+
stepStatus = StepStatus.FAILED
61+
info = mapOf("message" to getFailedMessage(indexName, actions))
62+
}
63+
}
64+
65+
override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
66+
return currentMetadata.copy(
67+
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
68+
transitionTo = null,
69+
info = info
70+
)
71+
}
72+
73+
override fun isIdempotent() = true
74+
75+
companion object {
76+
val validTopContextFields = setOf("index")
77+
const val name = "attempt_alias"
78+
fun getFailedMessage(
79+
index: String,
80+
actions: List<IndicesAliasesRequest.AliasActions>
81+
) = "Failed to update alias [index=$index] for actions: [actions=$actions]"
82+
83+
fun getSuccessMessage(index: String) = "Successfully updated alias [index=$index]"
84+
}
85+
}

src/main/resources/mappings/opendistro-ism-config.json

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"_meta" : {
3-
"schema_version": 16
3+
"schema_version": 17
44
},
55
"dynamic": "strict",
66
"properties": {
@@ -159,6 +159,14 @@
159159
}
160160
}
161161
},
162+
"alias": {
163+
"properties": {
164+
"actions": {
165+
"type": "object",
166+
"enabled": false
167+
}
168+
}
169+
},
162170
"delete": {
163171
"type": "object"
164172
},

src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import javax.management.remote.JMXServiceURL
2727

2828
abstract class IndexManagementRestTestCase : ODFERestTestCase() {
2929

30-
val configSchemaVersion = 16
30+
val configSchemaVersion = 17
3131
val historySchemaVersion = 5
3232

3333
// Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as

src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@
66
package org.opensearch.indexmanagement.indexstatemanagement
77

88
import org.opensearch.action.admin.indices.alias.Alias
9+
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest
910
import org.opensearch.common.unit.ByteSizeValue
1011
import org.opensearch.common.unit.TimeValue
1112
import org.opensearch.common.xcontent.ToXContent
1213
import org.opensearch.common.xcontent.XContentFactory
1314
import org.opensearch.index.RandomCreateIndexGenerator.randomAlias
1415
import org.opensearch.index.seqno.SequenceNumbers
16+
import org.opensearch.indexmanagement.indexstatemanagement.action.AliasAction
1517
import org.opensearch.indexmanagement.indexstatemanagement.action.AllocationAction
1618
import org.opensearch.indexmanagement.indexstatemanagement.action.CloseAction
1719
import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteAction
@@ -206,6 +208,24 @@ fun randomOpenActionConfig(): OpenAction {
206208
return OpenAction(index = 0)
207209
}
208210

211+
fun randomAliasAction(includeIndices: Boolean = false): AliasAction {
212+
val actions = List(OpenSearchRestTestCase.randomIntBetween(1, 10)) { if (includeIndices) randomAliasActionWithIndices() else randomAliasActions() }
213+
return AliasAction(actions = actions, index = 0)
214+
}
215+
216+
fun randomAliasActions(): IndicesAliasesRequest.AliasActions {
217+
val types = listOf(IndicesAliasesRequest.AliasActions.Type.ADD, IndicesAliasesRequest.AliasActions.Type.REMOVE)
218+
return IndicesAliasesRequest.AliasActions(OpenSearchRestTestCase.randomSubsetOf(1, types).first())
219+
.alias(OpenSearchRestTestCase.randomAlphaOfLength(10))
220+
}
221+
222+
fun randomAliasActionWithIndices(): IndicesAliasesRequest.AliasActions {
223+
val types = listOf(IndicesAliasesRequest.AliasActions.Type.ADD, IndicesAliasesRequest.AliasActions.Type.REMOVE)
224+
return IndicesAliasesRequest.AliasActions(OpenSearchRestTestCase.randomSubsetOf(1, types).first())
225+
.alias(OpenSearchRestTestCase.randomAlphaOfLength(10))
226+
.indices(OpenSearchRestTestCase.randomAlphaOfLength(10))
227+
}
228+
209229
fun randomDestination(type: DestinationType = randomDestinationType()): Destination {
210230
return Destination(
211231
type = type,
@@ -478,6 +498,11 @@ fun OpenAction.toJsonString(): String {
478498
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
479499
}
480500

501+
fun AliasAction.toJsonString(): String {
502+
val builder = XContentFactory.jsonBuilder()
503+
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
504+
}
505+
481506
fun ISMTemplate.toJsonString(): String {
482507
val builder = XContentFactory.jsonBuilder()
483508
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()

0 commit comments

Comments
 (0)