Skip to content

Commit fad78ae

Browse files
petardzepinapetardz
authored andcommitted
initial commit
Signed-off-by: petar.dzepina <petar.dzepina@gmail.com>
1 parent 1f581dd commit fad78ae

File tree

4 files changed

+159
-20
lines changed

4 files changed

+159
-20
lines changed

src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ import org.opensearch.indexmanagement.rollup.resthandler.RestStartRollupAction
110110
import org.opensearch.indexmanagement.rollup.resthandler.RestStopRollupAction
111111
import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings
112112
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
113+
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
113114
import org.opensearch.indexmanagement.settings.IndexManagementSettings
114115
import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestCreateSMPolicyHandler
115116
import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestDeleteSMPolicyHandler
@@ -370,6 +371,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
370371
this.indexNameExpressionResolver = indexNameExpressionResolver
371372

372373
val skipFlag = SkipExecution(client, clusterService)
374+
RollupFieldValueExpressionResolver.registerServices(scriptService, clusterService)
373375
val rollupRunner = RollupRunner
374376
.registerClient(client)
375377
.registerClusterService(clusterService)

src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt

Lines changed: 87 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,16 @@ import org.opensearch.action.admin.indices.create.CreateIndexRequest
1414
import org.opensearch.action.admin.indices.create.CreateIndexResponse
1515
import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest
1616
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse
17+
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
18+
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
1719
import org.opensearch.action.support.IndicesOptions
1820
import org.opensearch.action.support.master.AcknowledgedResponse
1921
import org.opensearch.client.Client
2022
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
2123
import org.opensearch.cluster.metadata.MappingMetadata
2224
import org.opensearch.cluster.service.ClusterService
2325
import org.opensearch.common.settings.Settings
26+
import org.opensearch.common.xcontent.XContentType
2427
import org.opensearch.indexmanagement.IndexManagementIndices
2528
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
2629
import org.opensearch.indexmanagement.common.model.dimension.Histogram
@@ -32,6 +35,7 @@ import org.opensearch.indexmanagement.rollup.model.Rollup
3235
import org.opensearch.indexmanagement.rollup.model.RollupJobValidationResult
3336
import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings
3437
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
38+
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
3539
import org.opensearch.indexmanagement.rollup.util.isRollupIndex
3640
import org.opensearch.indexmanagement.util.IndexUtils.Companion._META
3741
import org.opensearch.indexmanagement.util.IndexUtils.Companion.getFieldFromMappings
@@ -52,14 +56,33 @@ class RollupMapperService(
5256
// If the index already exists we need to verify it's a rollup index,
5357
// confirm it does not conflict with existing jobs and is a valid job
5458
@Suppress("ReturnCount")
55-
private suspend fun validateAndAttemptToUpdateTargetIndex(rollup: Rollup): RollupJobValidationResult {
56-
if (!isRollupIndex(rollup.targetIndex, clusterService.state())) {
57-
return RollupJobValidationResult.Invalid("Target index [${rollup.targetIndex}] is a non rollup index")
58-
}
59+
private suspend fun validateAndAttemptToUpdateTargetIndex(rollup: Rollup, targetIndexResolvedName: String, hasLegacyPlugin: Boolean): RollupJobValidationResult {
60+
if (!isRollupIndex(targetIndexResolvedName, clusterService.state()) &&
61+
RollupFieldValueExpressionResolver.hasAlias(targetIndexResolvedName)) {
5962

60-
return when (val jobExistsResult = jobExistsInRollupIndex(rollup)) {
63+
val backingIndices = RollupFieldValueExpressionResolver.getBackingIndicesForAlias(targetIndexResolvedName)
64+
backingIndices?.forEach {
65+
if (it.index.name != targetIndexResolvedName) {
66+
when (val jobExistsResult = jobExistsInRollupIndex(rollup, it.index.name)) {
67+
is RollupJobValidationResult.Invalid, is RollupJobValidationResult.Failure -> return jobExistsResult
68+
}
69+
}
70+
}
71+
val mappings = getMappings(targetIndexResolvedName)
72+
if (mappings is GetMappingsResult.Failure) {
73+
return RollupJobValidationResult.Failure("Failed to get mappings for target index: $targetIndexResolvedName")
74+
} else if (mappings is GetMappingsResult.Success && mappings.response.mappings()?.get(targetIndexResolvedName)?.sourceAsMap().isNullOrEmpty() == false) {
75+
return RollupJobValidationResult.Failure("If target_index is alias, backing index must be empty: $targetIndexResolvedName")
76+
}
77+
return prepareTargetIndex(rollup, targetIndexResolvedName, hasLegacyPlugin)
78+
} else {
79+
if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) {
80+
return RollupJobValidationResult.Invalid("Target index [$targetIndexResolvedName] is a non rollup index")
81+
}
82+
}
83+
return when (val jobExistsResult = jobExistsInRollupIndex(rollup, targetIndexResolvedName)) {
6184
is RollupJobValidationResult.Valid -> jobExistsResult
62-
is RollupJobValidationResult.Invalid -> updateRollupIndexMappings(rollup)
85+
is RollupJobValidationResult.Invalid -> updateRollupIndexMappings(rollup, targetIndexResolvedName)
6386
is RollupJobValidationResult.Failure -> jobExistsResult
6487
}
6588
}
@@ -69,14 +92,15 @@ class RollupMapperService(
6992
// TODO: error handling
7093
@Suppress("ReturnCount")
7194
suspend fun attemptCreateRollupTargetIndex(job: Rollup, hasLegacyPlugin: Boolean): RollupJobValidationResult {
72-
if (indexExists(job.targetIndex)) {
73-
return validateAndAttemptToUpdateTargetIndex(job)
95+
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(job, job.targetIndex)
96+
if (indexExists(targetIndexResolvedName)) {
97+
return validateAndAttemptToUpdateTargetIndex(job, targetIndexResolvedName, hasLegacyPlugin)
7498
} else {
75-
val errorMessage = "Failed to create target index [${job.targetIndex}]"
99+
val errorMessage = "Failed to create target index [$targetIndexResolvedName]"
76100
return try {
77-
val response = createTargetIndex(job, hasLegacyPlugin)
101+
val response = createTargetIndex(targetIndexResolvedName, hasLegacyPlugin)
78102
if (response.isAcknowledged) {
79-
updateRollupIndexMappings(job)
103+
updateRollupIndexMappings(job, targetIndexResolvedName)
80104
} else {
81105
RollupJobValidationResult.Failure(errorMessage)
82106
}
@@ -94,13 +118,57 @@ class RollupMapperService(
94118
}
95119
}
96120

97-
private suspend fun createTargetIndex(job: Rollup, hasLegacyPlugin: Boolean): CreateIndexResponse {
121+
suspend fun prepareTargetIndex(rollup: Rollup, targetIndexResolvedName: String, hasLegacyPlugin: Boolean): RollupJobValidationResult {
122+
var errorMessage = ""
123+
try {
124+
// 1. First we need to add index.plugins.rollup_index setting to index
125+
val settings = if (hasLegacyPlugin) {
126+
Settings.builder().put(LegacyOpenDistroRollupSettings.ROLLUP_INDEX.key, true).build()
127+
} else {
128+
Settings.builder().put(RollupSettings.ROLLUP_INDEX.key, true).build()
129+
}
130+
errorMessage = "Failed to update settings for target index [$targetIndexResolvedName]"
131+
val resp: AcknowledgedResponse = client.admin().indices().suspendUntil {
132+
updateSettings(UpdateSettingsRequest(settings, targetIndexResolvedName), it)
133+
}
134+
if (!resp.isAcknowledged) {
135+
return RollupJobValidationResult.Invalid("Failed to update rollup settings for target index: [$targetIndexResolvedName]")
136+
}
137+
138+
// 2. Put rollup mappings
139+
val putMappingRequest: PutMappingRequest =
140+
PutMappingRequest(targetIndexResolvedName).source(IndexManagementIndices.rollupTargetMappings, XContentType.JSON)
141+
errorMessage = "Failed to put initial rollup mappings for target index [$targetIndexResolvedName]"
142+
val respMappings: AcknowledgedResponse = client.admin().indices().suspendUntil {
143+
putMapping(putMappingRequest, it)
144+
}
145+
if (!respMappings.isAcknowledged) {
146+
return RollupJobValidationResult.Invalid("Failed to update rollup settings for target index: [$targetIndexResolvedName]")
147+
}
148+
// 3.
149+
errorMessage = "Failed to update mappings for target index [$targetIndexResolvedName]"
150+
updateRollupIndexMappings(rollup, targetIndexResolvedName)
151+
} catch (e: RemoteTransportException) {
152+
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
153+
logger.error(errorMessage, unwrappedException)
154+
RollupJobValidationResult.Failure(errorMessage, unwrappedException)
155+
} catch (e: OpenSearchSecurityException) {
156+
logger.error("$errorMessage because ", e)
157+
RollupJobValidationResult.Failure("$errorMessage - missing required cluster permissions: ${e.localizedMessage}", e)
158+
} catch (e: Exception) {
159+
logger.error("$errorMessage because ", e)
160+
RollupJobValidationResult.Failure(errorMessage, e)
161+
}
162+
return RollupJobValidationResult.Valid
163+
}
164+
165+
private suspend fun createTargetIndex(targetIndexName: String, hasLegacyPlugin: Boolean): CreateIndexResponse {
98166
val settings = if (hasLegacyPlugin) {
99167
Settings.builder().put(LegacyOpenDistroRollupSettings.ROLLUP_INDEX.key, true).build()
100168
} else {
101169
Settings.builder().put(RollupSettings.ROLLUP_INDEX.key, true).build()
102170
}
103-
val request = CreateIndexRequest(job.targetIndex)
171+
val request = CreateIndexRequest(targetIndexName)
104172
.settings(settings)
105173
.mapping(IndexManagementIndices.rollupTargetMappings)
106174
// TODO: Perhaps we can do better than this for mappings... as it'll be dynamic for rest
@@ -204,19 +272,19 @@ class RollupMapperService(
204272
return field != null
205273
}
206274

207-
private suspend fun jobExistsInRollupIndex(rollup: Rollup): RollupJobValidationResult {
208-
val res = when (val getMappingsResult = getMappings(rollup.targetIndex)) {
275+
private suspend fun jobExistsInRollupIndex(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult {
276+
val res = when (val getMappingsResult = getMappings(targetIndexResolvedName)) {
209277
is GetMappingsResult.Success -> getMappingsResult.response
210278
is GetMappingsResult.Failure ->
211279
return RollupJobValidationResult.Failure(getMappingsResult.message, getMappingsResult.cause)
212280
}
213281

214-
val indexMapping: MappingMetadata = res.mappings[rollup.targetIndex]
282+
val indexMapping: MappingMetadata = res.mappings[targetIndexResolvedName]
215283

216284
return if (((indexMapping.sourceAsMap?.get(_META) as Map<*, *>?)?.get(ROLLUPS) as Map<*, *>?)?.containsKey(rollup.id) == true) {
217285
RollupJobValidationResult.Valid
218286
} else {
219-
RollupJobValidationResult.Invalid("Rollup job [${rollup.id}] does not exist in rollup index [${rollup.targetIndex}]")
287+
RollupJobValidationResult.Invalid("Rollup job [${rollup.id}] does not exist in rollup index [$targetIndexResolvedName]")
220288
}
221289
}
222290

@@ -254,8 +322,8 @@ class RollupMapperService(
254322
// where they can both get the same mapping state and only add their own job, meaning one
255323
// of the jobs won't be added to the target index _meta
256324
@Suppress("BlockingMethodInNonBlockingContext", "ReturnCount")
257-
private suspend fun updateRollupIndexMappings(rollup: Rollup): RollupJobValidationResult {
258-
val errorMessage = "Failed to update mappings of target index [${rollup.targetIndex}] with rollup job"
325+
private suspend fun updateRollupIndexMappings(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult {
326+
val errorMessage = "Failed to update mappings of target index [$targetIndexResolvedName] with rollup job"
259327
try {
260328
val response = withContext(Dispatchers.IO) {
261329
val resp: AcknowledgedResponse = client.suspendUntil {

src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class RollupSettings {
3939
"index.plugins.rollup_index",
4040
LegacyOpenDistroRollupSettings.ROLLUP_INDEX,
4141
Setting.Property.IndexScope,
42-
Setting.Property.InternalIndex
42+
Setting.Property.Dynamic
4343
)
4444

4545
val ROLLUP_INGEST_BACKOFF_MILLIS: Setting<TimeValue> = Setting.positiveTimeSetting(
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.rollup.util
7+
8+
import org.opensearch.cluster.metadata.IndexAbstraction
9+
import org.opensearch.cluster.metadata.IndexMetadata
10+
import org.opensearch.cluster.service.ClusterService
11+
import org.opensearch.common.xcontent.XContentFactory
12+
import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE
13+
import org.opensearch.indexmanagement.opensearchapi.toMap
14+
import org.opensearch.indexmanagement.rollup.model.Rollup
15+
import org.opensearch.script.Script
16+
import org.opensearch.script.ScriptService
17+
import org.opensearch.script.ScriptType
18+
import org.opensearch.script.TemplateScript
19+
20+
object RollupFieldValueExpressionResolver {
21+
22+
private val validTopContextFields = setOf(Rollup.SOURCE_INDEX_FIELD)
23+
24+
private lateinit var scriptService: ScriptService
25+
private lateinit var clusterService: ClusterService
26+
fun resolve(rollup: Rollup, fieldValue: String): String {
27+
val script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, fieldValue, mapOf())
28+
29+
val contextMap = rollup.toXContent(XContentFactory.jsonBuilder(), XCONTENT_WITHOUT_TYPE)
30+
.toMap()
31+
.filterKeys { key -> key in validTopContextFields }
32+
33+
var compiledValue = scriptService.compile(script, TemplateScript.CONTEXT)
34+
.newInstance(script.params + mapOf("ctx" to contextMap))
35+
.execute()
36+
37+
if (isAlias(compiledValue)) {
38+
compiledValue = getWriteIndexNameForAlias(compiledValue)
39+
}
40+
41+
return if (compiledValue.isNullOrBlank()) fieldValue else compiledValue
42+
}
43+
44+
fun registerScriptService(scriptService: ScriptService) {
45+
this.scriptService = scriptService
46+
}
47+
fun hasAlias(index: String): Boolean {
48+
val aliases = clusterService.state().metadata().indices.get(index)?.aliases
49+
if (aliases != null) {
50+
return aliases.size() > 0
51+
}
52+
return false
53+
}
54+
fun isAlias(index: String): Boolean {
55+
return clusterService.state().metadata().indicesLookup?.get(index) is IndexAbstraction.Alias
56+
}
57+
fun getWriteIndexNameForAlias(alias: String): String? {
58+
return clusterService.state().metadata().indicesLookup?.get(alias)?.writeIndex?.index?.name
59+
}
60+
61+
fun getBackingIndicesForAlias(alias: String): MutableList<IndexMetadata>? {
62+
return clusterService.state().metadata().indicesLookup?.get(alias)?.indices
63+
}
64+
65+
fun registerServices(scriptService: ScriptService, clusterService: ClusterService) {
66+
this.scriptService = scriptService
67+
this.clusterService = clusterService
68+
}
69+
}

0 commit comments

Comments
 (0)