Skip to content

Commit e858ab2

Browse files
authored
added support for mustache scripting of rollup.target_index field (opensearch-project#435)
* added support for mustache scripting of rollup.target_index field Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com> * defekt fixes Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com> * tests Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com> * small refactor/improvements Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com> * added wildcard check when creating rollup job; removed resolving targetIndex on Rollup init; added test for wildcards Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com> * lint fixes Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com> * moved target_index validation in getRollup resp handler Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com> * added using toMap() Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com> * removed catch block Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com> * exception fix Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com> * linter fix Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com> * fixed IT fail Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com> * added Exception catch block Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>
1 parent 1f581dd commit e858ab2

File tree

9 files changed

+266
-23
lines changed

9 files changed

+266
-23
lines changed

src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ import org.opensearch.indexmanagement.rollup.resthandler.RestStartRollupAction
110110
import org.opensearch.indexmanagement.rollup.resthandler.RestStopRollupAction
111111
import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings
112112
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
113+
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
113114
import org.opensearch.indexmanagement.settings.IndexManagementSettings
114115
import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestCreateSMPolicyHandler
115116
import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestDeleteSMPolicyHandler
@@ -370,6 +371,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
370371
this.indexNameExpressionResolver = indexNameExpressionResolver
371372

372373
val skipFlag = SkipExecution(client, clusterService)
374+
RollupFieldValueExpressionResolver.registerScriptService(scriptService)
373375
val rollupRunner = RollupRunner
374376
.registerClient(client)
375377
.registerClusterService(clusterService)

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollup/AttemptCreateRollupJobStep.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.step.rollup
77

88
import org.apache.logging.log4j.LogManager
99
import org.opensearch.ExceptionsHelper
10+
import org.opensearch.OpenSearchException
1011
import org.opensearch.action.support.WriteRequest
1112
import org.opensearch.action.support.master.AcknowledgedResponse
1213
import org.opensearch.index.engine.VersionConflictEngineException
@@ -62,7 +63,9 @@ class AttemptCreateRollupJobStep(private val action: RollupAction) : Step(name)
6263
}
6364
} catch (e: RemoteTransportException) {
6465
processFailure(rollup.id, indexName, ExceptionsHelper.unwrapCause(e) as Exception)
65-
} catch (e: RemoteTransportException) {
66+
} catch (e: OpenSearchException) {
67+
processFailure(rollup.id, indexName, e)
68+
} catch (e: Exception) {
6669
processFailure(rollup.id, indexName, e)
6770
}
6871

src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupIndexer.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.opensearch.indexmanagement.rollup.model.Rollup
2323
import org.opensearch.indexmanagement.rollup.model.RollupStats
2424
import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.ROLLUP_INGEST_BACKOFF_COUNT
2525
import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.ROLLUP_INGEST_BACKOFF_MILLIS
26+
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
2627
import org.opensearch.indexmanagement.rollup.util.getInitialDocValues
2728
import org.opensearch.indexmanagement.util.IndexUtils.Companion.ODFE_MAGIC_NULL
2829
import org.opensearch.indexmanagement.util.IndexUtils.Companion.hashToFixedSize
@@ -123,7 +124,8 @@ class RollupIndexer(
123124
}
124125
}
125126
mapOfKeyValues.putAll(aggResults)
126-
val indexRequest = IndexRequest(job.targetIndex)
127+
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(job, job.targetIndex)
128+
val indexRequest = IndexRequest(targetIndexResolvedName)
127129
.id(documentId)
128130
.source(mapOfKeyValues, XContentType.JSON)
129131
requests.add(indexRequest)

src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.opensearch.indexmanagement.rollup.model.Rollup
3232
import org.opensearch.indexmanagement.rollup.model.RollupJobValidationResult
3333
import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings
3434
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
35+
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
3536
import org.opensearch.indexmanagement.rollup.util.isRollupIndex
3637
import org.opensearch.indexmanagement.util.IndexUtils.Companion._META
3738
import org.opensearch.indexmanagement.util.IndexUtils.Companion.getFieldFromMappings
@@ -52,14 +53,14 @@ class RollupMapperService(
5253
// If the index already exists we need to verify it's a rollup index,
5354
// confirm it does not conflict with existing jobs and is a valid job
5455
@Suppress("ReturnCount")
55-
private suspend fun validateAndAttemptToUpdateTargetIndex(rollup: Rollup): RollupJobValidationResult {
56-
if (!isRollupIndex(rollup.targetIndex, clusterService.state())) {
57-
return RollupJobValidationResult.Invalid("Target index [${rollup.targetIndex}] is a non rollup index")
56+
private suspend fun validateAndAttemptToUpdateTargetIndex(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult {
57+
if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) {
58+
return RollupJobValidationResult.Invalid("Target index [$targetIndexResolvedName] is a non rollup index")
5859
}
5960

60-
return when (val jobExistsResult = jobExistsInRollupIndex(rollup)) {
61+
return when (val jobExistsResult = jobExistsInRollupIndex(rollup, targetIndexResolvedName)) {
6162
is RollupJobValidationResult.Valid -> jobExistsResult
62-
is RollupJobValidationResult.Invalid -> updateRollupIndexMappings(rollup)
63+
is RollupJobValidationResult.Invalid -> updateRollupIndexMappings(rollup, targetIndexResolvedName)
6364
is RollupJobValidationResult.Failure -> jobExistsResult
6465
}
6566
}
@@ -69,14 +70,15 @@ class RollupMapperService(
6970
// TODO: error handling
7071
@Suppress("ReturnCount")
7172
suspend fun attemptCreateRollupTargetIndex(job: Rollup, hasLegacyPlugin: Boolean): RollupJobValidationResult {
72-
if (indexExists(job.targetIndex)) {
73-
return validateAndAttemptToUpdateTargetIndex(job)
73+
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(job, job.targetIndex)
74+
if (indexExists(targetIndexResolvedName)) {
75+
return validateAndAttemptToUpdateTargetIndex(job, targetIndexResolvedName)
7476
} else {
75-
val errorMessage = "Failed to create target index [${job.targetIndex}]"
77+
val errorMessage = "Failed to create target index [$targetIndexResolvedName]"
7678
return try {
77-
val response = createTargetIndex(job, hasLegacyPlugin)
79+
val response = createTargetIndex(targetIndexResolvedName, hasLegacyPlugin)
7880
if (response.isAcknowledged) {
79-
updateRollupIndexMappings(job)
81+
updateRollupIndexMappings(job, targetIndexResolvedName)
8082
} else {
8183
RollupJobValidationResult.Failure(errorMessage)
8284
}
@@ -94,13 +96,13 @@ class RollupMapperService(
9496
}
9597
}
9698

97-
private suspend fun createTargetIndex(job: Rollup, hasLegacyPlugin: Boolean): CreateIndexResponse {
99+
private suspend fun createTargetIndex(targetIndexName: String, hasLegacyPlugin: Boolean): CreateIndexResponse {
98100
val settings = if (hasLegacyPlugin) {
99101
Settings.builder().put(LegacyOpenDistroRollupSettings.ROLLUP_INDEX.key, true).build()
100102
} else {
101103
Settings.builder().put(RollupSettings.ROLLUP_INDEX.key, true).build()
102104
}
103-
val request = CreateIndexRequest(job.targetIndex)
105+
val request = CreateIndexRequest(targetIndexName)
104106
.settings(settings)
105107
.mapping(IndexManagementIndices.rollupTargetMappings)
106108
// TODO: Perhaps we can do better than this for mappings... as it'll be dynamic for rest
@@ -204,19 +206,19 @@ class RollupMapperService(
204206
return field != null
205207
}
206208

207-
private suspend fun jobExistsInRollupIndex(rollup: Rollup): RollupJobValidationResult {
208-
val res = when (val getMappingsResult = getMappings(rollup.targetIndex)) {
209+
private suspend fun jobExistsInRollupIndex(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult {
210+
val res = when (val getMappingsResult = getMappings(targetIndexResolvedName)) {
209211
is GetMappingsResult.Success -> getMappingsResult.response
210212
is GetMappingsResult.Failure ->
211213
return RollupJobValidationResult.Failure(getMappingsResult.message, getMappingsResult.cause)
212214
}
213215

214-
val indexMapping: MappingMetadata = res.mappings[rollup.targetIndex]
216+
val indexMapping: MappingMetadata = res.mappings[targetIndexResolvedName]
215217

216218
return if (((indexMapping.sourceAsMap?.get(_META) as Map<*, *>?)?.get(ROLLUPS) as Map<*, *>?)?.containsKey(rollup.id) == true) {
217219
RollupJobValidationResult.Valid
218220
} else {
219-
RollupJobValidationResult.Invalid("Rollup job [${rollup.id}] does not exist in rollup index [${rollup.targetIndex}]")
221+
RollupJobValidationResult.Invalid("Rollup job [${rollup.id}] does not exist in rollup index [$targetIndexResolvedName]")
220222
}
221223
}
222224

@@ -254,8 +256,8 @@ class RollupMapperService(
254256
// where they can both get the same mapping state and only add their own job, meaning one
255257
// of the jobs won't be added to the target index _meta
256258
@Suppress("BlockingMethodInNonBlockingContext", "ReturnCount")
257-
private suspend fun updateRollupIndexMappings(rollup: Rollup): RollupJobValidationResult {
258-
val errorMessage = "Failed to update mappings of target index [${rollup.targetIndex}] with rollup job"
259+
private suspend fun updateRollupIndexMappings(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult {
260+
val errorMessage = "Failed to update mappings of target index [$targetIndexResolvedName] with rollup job"
259261
try {
260262
val response = withContext(Dispatchers.IO) {
261263
val resp: AcknowledgedResponse = client.suspendUntil {

src/main/kotlin/org/opensearch/indexmanagement/rollup/action/index/TransportIndexRollupAction.kt

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.opensearch.commons.authuser.User
2828
import org.opensearch.indexmanagement.IndexManagementIndices
2929
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
3030
import org.opensearch.indexmanagement.rollup.model.Rollup
31+
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
3132
import org.opensearch.indexmanagement.rollup.util.parseRollup
3233
import org.opensearch.indexmanagement.settings.IndexManagementSettings
3334
import org.opensearch.indexmanagement.util.IndexUtils
@@ -91,6 +92,14 @@ class TransportIndexRollupAction @Inject constructor(
9192
if (response.isAcknowledged) {
9293
log.info("Successfully created or updated $INDEX_MANAGEMENT_INDEX with newest mappings.")
9394
if (request.opType() == DocWriteRequest.OpType.CREATE) {
95+
if (!validateTargetIndexName()) {
96+
return actionListener.onFailure(
97+
OpenSearchStatusException(
98+
"target_index value is invalid: ${request.rollup.targetIndex}",
99+
RestStatus.BAD_REQUEST
100+
)
101+
)
102+
}
94103
putRollup()
95104
} else {
96105
getRollup()
@@ -128,6 +137,14 @@ class TransportIndexRollupAction @Inject constructor(
128137
if (modified.isNotEmpty()) {
129138
return actionListener.onFailure(OpenSearchStatusException("Not allowed to modify $modified", RestStatus.BAD_REQUEST))
130139
}
140+
if (!validateTargetIndexName()) {
141+
return actionListener.onFailure(
142+
OpenSearchStatusException(
143+
"target_index value is invalid: ${request.rollup.targetIndex}",
144+
RestStatus.BAD_REQUEST
145+
)
146+
)
147+
}
131148
putRollup()
132149
}
133150

@@ -172,5 +189,10 @@ class TransportIndexRollupAction @Inject constructor(
172189
}
173190
)
174191
}
192+
193+
private fun validateTargetIndexName(): Boolean {
194+
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(request.rollup, request.rollup.targetIndex)
195+
return targetIndexResolvedName.contains("*") == false && targetIndexResolvedName.contains("?") == false
196+
}
175197
}
176198
}

src/main/kotlin/org/opensearch/indexmanagement/rollup/action/mapping/TransportUpdateRollupMappingAction.kt

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.opensearch.common.xcontent.XContentFactory
2525
import org.opensearch.common.xcontent.XContentHelper
2626
import org.opensearch.common.xcontent.XContentType
2727
import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE
28+
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
2829
import org.opensearch.indexmanagement.util.IndexUtils.Companion._META
2930
import org.opensearch.threadpool.ThreadPool
3031
import org.opensearch.transport.TransportService
@@ -50,7 +51,8 @@ class TransportUpdateRollupMappingAction @Inject constructor(
5051
private val log = LogManager.getLogger(javaClass)
5152

5253
override fun checkBlock(request: UpdateRollupMappingRequest, state: ClusterState): ClusterBlockException? {
53-
return state.blocks.indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, arrayOf(request.rollup.targetIndex))
54+
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(request.rollup, request.rollup.targetIndex)
55+
return state.blocks.indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, arrayOf(targetIndexResolvedName))
5456
}
5557

5658
@Suppress("ReturnCount", "LongMethod")
@@ -59,7 +61,8 @@ class TransportUpdateRollupMappingAction @Inject constructor(
5961
state: ClusterState,
6062
listener: ActionListener<AcknowledgedResponse>
6163
) {
62-
val index = state.metadata.index(request.rollup.targetIndex)
64+
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(request.rollup, request.rollup.targetIndex)
65+
val index = state.metadata.index(targetIndexResolvedName)
6366
if (index == null) {
6467
log.debug("Could not find index [$index]")
6568
return listener.onFailure(IllegalStateException("Could not find index [$index]"))
@@ -113,7 +116,7 @@ class TransportUpdateRollupMappingAction @Inject constructor(
113116
}
114117

115118
// TODO: Does schema_version get overwritten?
116-
val putMappingRequest = PutMappingRequest(request.rollup.targetIndex).source(metaMappings)
119+
val putMappingRequest = PutMappingRequest(targetIndexResolvedName).source(metaMappings)
117120
client.admin().indices().putMapping(
118121
putMappingRequest,
119122
object : ActionListener<AcknowledgedResponse> {
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.rollup.util
7+
8+
import org.opensearch.common.xcontent.XContentFactory
9+
import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE
10+
import org.opensearch.indexmanagement.opensearchapi.toMap
11+
import org.opensearch.indexmanagement.rollup.model.Rollup
12+
import org.opensearch.script.Script
13+
import org.opensearch.script.ScriptService
14+
import org.opensearch.script.ScriptType
15+
import org.opensearch.script.TemplateScript
16+
17+
object RollupFieldValueExpressionResolver {
18+
19+
private val validTopContextFields = setOf(Rollup.SOURCE_INDEX_FIELD)
20+
21+
private lateinit var scriptService: ScriptService
22+
23+
fun resolve(rollup: Rollup, fieldValue: String): String {
24+
val script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, fieldValue, mapOf())
25+
26+
val contextMap = rollup.toXContent(XContentFactory.jsonBuilder(), XCONTENT_WITHOUT_TYPE)
27+
.toMap()
28+
.filterKeys { key -> key in validTopContextFields }
29+
30+
val compiledValue = scriptService.compile(script, TemplateScript.CONTEXT)
31+
.newInstance(script.params + mapOf("ctx" to contextMap))
32+
.execute()
33+
34+
return if (compiledValue.isBlank()) fieldValue else compiledValue
35+
}
36+
37+
fun registerScriptService(scriptService: ScriptService) {
38+
this.scriptService = scriptService
39+
}
40+
}

0 commit comments

Comments
 (0)