@@ -14,13 +14,16 @@ import org.opensearch.action.admin.indices.create.CreateIndexRequest
1414import org.opensearch.action.admin.indices.create.CreateIndexResponse
1515import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest
1616import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse
17+ import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
18+ import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
1719import org.opensearch.action.support.IndicesOptions
1820import org.opensearch.action.support.master.AcknowledgedResponse
1921import org.opensearch.client.Client
2022import org.opensearch.cluster.metadata.IndexNameExpressionResolver
2123import org.opensearch.cluster.metadata.MappingMetadata
2224import org.opensearch.cluster.service.ClusterService
2325import org.opensearch.common.settings.Settings
26+ import org.opensearch.common.xcontent.XContentType
2427import org.opensearch.indexmanagement.IndexManagementIndices
2528import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
2629import org.opensearch.indexmanagement.common.model.dimension.Histogram
@@ -33,7 +36,9 @@ import org.opensearch.indexmanagement.rollup.model.RollupJobValidationResult
3336import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings
3437import org.opensearch.indexmanagement.rollup.settings.RollupSettings
3538import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
39+ import org.opensearch.indexmanagement.rollup.util.getRollupJobs
3640import org.opensearch.indexmanagement.rollup.util.isRollupIndex
41+ import org.opensearch.indexmanagement.rollup.util.isTargetIndexAlias
3742import org.opensearch.indexmanagement.util.IndexUtils.Companion._META
3843import org.opensearch.indexmanagement.util.IndexUtils.Companion.getFieldFromMappings
3944import org.opensearch.transport.RemoteTransportException
@@ -50,29 +55,105 @@ class RollupMapperService(
5055
5156 private val logger = LogManager .getLogger(javaClass)
5257
53- // If the index already exists we need to verify it's a rollup index,
54- // confirm it does not conflict with existing jobs and is a valid job
58+ /* *
59+ * If the index already exists we need to verify it's a rollup index,
60+ * confirm it does not conflict with existing jobs and is a valid job.
61+ * If
62+ *
63+ * @param rollup Rollup job we're currently executing
64+ * @param targetIndexResolvedName concrete index name
65+ * @param hasLegacyPlugin flag to indicate if we're running legacy plugin
66+ * @return RollupJobValidationResult indicating success or failure with appropriate error message included.
67+ */
5568 @Suppress(" ReturnCount" )
56- private suspend fun validateAndAttemptToUpdateTargetIndex (rollup : Rollup , targetIndexResolvedName : String ): RollupJobValidationResult {
57- if (! isRollupIndex(targetIndexResolvedName, clusterService.state())) {
69+ private suspend fun validateAndAttemptToUpdateTargetIndex (
70+ rollup : Rollup ,
71+ targetIndexResolvedName : String ,
72+ hasLegacyPlugin : Boolean
73+ ): RollupJobValidationResult {
74+ if (rollup.isTargetIndexAlias()) {
75+ val aliasValidationResult = validateTargetIndexAlias(rollup, targetIndexResolvedName)
76+ if (aliasValidationResult !is RollupJobValidationResult .Valid ) {
77+ return aliasValidationResult
78+ } else if (! isRollupIndex(targetIndexResolvedName, clusterService.state())) {
79+ return prepareTargetIndex(rollup, targetIndexResolvedName, hasLegacyPlugin)
80+ }
81+ } else if (! isRollupIndex(targetIndexResolvedName, clusterService.state())) {
5882 return RollupJobValidationResult .Invalid (" Target index [$targetIndexResolvedName ] is a non rollup index" )
5983 }
60-
6184 return when (val jobExistsResult = jobExistsInRollupIndex(rollup, targetIndexResolvedName)) {
6285 is RollupJobValidationResult .Valid -> jobExistsResult
6386 is RollupJobValidationResult .Invalid -> updateRollupIndexMappings(rollup, targetIndexResolvedName)
6487 is RollupJobValidationResult .Failure -> jobExistsResult
6588 }
6689 }
6790
91+ /* *
92+ * Target Index is valid alias if either all backing indices have this job in _meta
93+ * or there isn't any rollup job present in _meta
94+ */
95+ @Suppress(" ReturnCount" )
96+ suspend fun validateTargetIndexAlias (rollup : Rollup , targetIndexResolvedName : String ): RollupJobValidationResult {
97+
98+ var errorMessage: String
99+
100+ if (! RollupFieldValueExpressionResolver .indexAliasUtils.hasAlias(targetIndexResolvedName)) {
101+ logger.error(" [${rollup.targetIndex} ] is not an alias!" )
102+ return RollupJobValidationResult .Failure (" [${rollup.targetIndex} ] is not an alias!" )
103+ }
104+
105+ val rollupJobs = clusterService.state().metadata.index(targetIndexResolvedName).getRollupJobs()
106+ if (rollupJobs != null &&
107+ (rollupJobs.size > 1 || rollupJobs[0 ].id != rollup.id)
108+ ) {
109+ errorMessage = " More than one rollup job present on the backing index, cannot add alias for target index: [$targetIndexResolvedName ]"
110+ logger.error(errorMessage)
111+ return RollupJobValidationResult .Failure (errorMessage)
112+ }
113+
114+ // All other backing indices have to have this rollup job in _META field and it has to be the only one!
115+ val backingIndices = RollupFieldValueExpressionResolver .indexAliasUtils.getBackingIndicesForAlias(rollup.targetIndex)
116+ backingIndices?.forEach {
117+ if (it.index.name != targetIndexResolvedName) {
118+ val allRollupJobs = clusterService.state().metadata.index(it.index.name).getRollupJobs()
119+ val validationResult = validateNonWriteBackingIndex(it.index.name, rollup, allRollupJobs)
120+ if (validationResult !is RollupJobValidationResult .Valid ) {
121+ return validationResult
122+ }
123+ }
124+ }
125+ return RollupJobValidationResult .Valid
126+ }
127+
128+ suspend fun validateNonWriteBackingIndex (backingIndex : String , currentRollupJob : Rollup , rollupJobs : List <Rollup >? ): RollupJobValidationResult {
129+ var errorMessage = " "
130+ if (rollupJobs == null ) {
131+ errorMessage = " Backing index [$backingIndex ] has to have owner rollup job with id:[${currentRollupJob.id} ]"
132+ } else if (rollupJobs.size == 1 && rollupJobs[0 ].id != currentRollupJob.id) {
133+ errorMessage = " Backing index [$backingIndex ] has to have owner rollup job with id:[${currentRollupJob.id} ]"
134+ } else if (rollupJobs.size > 1 ) {
135+ errorMessage = " Backing index [$backingIndex ] has multiple rollup job owners"
136+ }
137+ if (errorMessage.isNotEmpty()) {
138+ logger.error(errorMessage)
139+ return RollupJobValidationResult .Failure (errorMessage)
140+ }
141+ return RollupJobValidationResult .Valid
142+ }
143+
68144 // This creates the target index if it doesn't already else validate the target index is rollup index
69145 // If the target index mappings doesn't contain rollup job attempts to update the mappings.
70146 // TODO: error handling
71147 @Suppress(" ReturnCount" )
72148 suspend fun attemptCreateRollupTargetIndex (job : Rollup , hasLegacyPlugin : Boolean ): RollupJobValidationResult {
73149 val targetIndexResolvedName = RollupFieldValueExpressionResolver .resolve(job, job.targetIndex)
74150 if (indexExists(targetIndexResolvedName)) {
75- return validateAndAttemptToUpdateTargetIndex(job, targetIndexResolvedName)
151+ val validationResult = validateAndAttemptToUpdateTargetIndex(job, targetIndexResolvedName, hasLegacyPlugin)
152+ when (validationResult) {
153+ is RollupJobValidationResult .Failure -> logger.error(validationResult.message)
154+ is RollupJobValidationResult .Invalid -> logger.error(validationResult.reason)
155+ }
156+ return validationResult
76157 } else {
77158 val errorMessage = " Failed to create target index [$targetIndexResolvedName ]"
78159 return try {
@@ -96,6 +177,53 @@ class RollupMapperService(
96177 }
97178 }
98179
180+ suspend fun addRollupSettingToIndex (targetIndexResolvedName : String , hasLegacyPlugin : Boolean ): Boolean {
181+ val settings = if (hasLegacyPlugin) {
182+ Settings .builder().put(LegacyOpenDistroRollupSettings .ROLLUP_INDEX .key, true ).build()
183+ } else {
184+ Settings .builder().put(RollupSettings .ROLLUP_INDEX .key, true ).build()
185+ }
186+ val resp: AcknowledgedResponse = client.admin().indices().suspendUntil {
187+ updateSettings(UpdateSettingsRequest (settings, targetIndexResolvedName), it)
188+ }
189+ return resp.isAcknowledged
190+ }
191+ @Suppress(" ReturnCount" )
192+ suspend fun prepareTargetIndex (rollup : Rollup , targetIndexResolvedName : String , hasLegacyPlugin : Boolean ): RollupJobValidationResult {
193+ var errorMessage = " "
194+ try {
195+ // 1. First we need to add index.plugins.rollup_index setting to index
196+ if (addRollupSettingToIndex(targetIndexResolvedName, hasLegacyPlugin) == false ) {
197+ logger.error(" Failed to update rollup settings for target index: [$targetIndexResolvedName ]" )
198+ return RollupJobValidationResult .Invalid (" Failed to update rollup settings for target index: [$targetIndexResolvedName ]" )
199+ }
200+
201+ // 2. Put rollup target_index mappings
202+ val putMappingRequest: PutMappingRequest =
203+ PutMappingRequest (targetIndexResolvedName).source(IndexManagementIndices .rollupTargetMappings, XContentType .JSON )
204+ val respMappings: AcknowledgedResponse = client.admin().indices().suspendUntil {
205+ putMapping(putMappingRequest, it)
206+ }
207+ if (! respMappings.isAcknowledged) {
208+ return RollupJobValidationResult .Invalid (" Failed to put initial rollup mappings for target index [$targetIndexResolvedName ]" )
209+ }
210+ // 3. Add this rollup job to target_index's _meta
211+ errorMessage = " Failed to update mappings for target index [$targetIndexResolvedName ]"
212+ updateRollupIndexMappings(rollup, targetIndexResolvedName)
213+ } catch (e: RemoteTransportException ) {
214+ val unwrappedException = ExceptionsHelper .unwrapCause(e) as Exception
215+ logger.error(errorMessage, unwrappedException)
216+ RollupJobValidationResult .Failure (errorMessage, unwrappedException)
217+ } catch (e: OpenSearchSecurityException ) {
218+ logger.error(" $errorMessage because " , e)
219+ RollupJobValidationResult .Failure (" $errorMessage - missing required cluster permissions: ${e.localizedMessage} " , e)
220+ } catch (e: Exception ) {
221+ logger.error(" $errorMessage because " , e)
222+ RollupJobValidationResult .Failure (errorMessage, e)
223+ }
224+ return RollupJobValidationResult .Valid
225+ }
226+
99227 private suspend fun createTargetIndex (targetIndexName : String , hasLegacyPlugin : Boolean ): CreateIndexResponse {
100228 val settings = if (hasLegacyPlugin) {
101229 Settings .builder().put(LegacyOpenDistroRollupSettings .ROLLUP_INDEX .key, true ).build()
0 commit comments