@@ -55,20 +55,24 @@ class RollupMapperService(
5555
5656 private val logger = LogManager .getLogger(javaClass)
5757
58- // If the index already exists we need to verify it's a rollup index,
59- // confirm it does not conflict with existing jobs and is a valid job
58+ /* *
59+ * If the index already exists we need to verify it's a rollup index,
60+ * confirm it does not conflict with existing jobs and is a valid job.
61+ * If
62+ *
63+ * @param rollup Rollup job we're currently executing
64+ * @param targetIndexResolvedName concrete index name
65+ * @param hasLegacyPlugin flag to indicate if we're running legacy plugin
66+ * @return RollupJobValidationResult indicating success or failure with appropriate error message included.
67+ */
6068 @Suppress(" ReturnCount" )
6169 private suspend fun validateAndAttemptToUpdateTargetIndex (
6270 rollup : Rollup ,
6371 targetIndexResolvedName : String ,
6472 hasLegacyPlugin : Boolean
6573 ): RollupJobValidationResult {
66- /* *
67- * Target Index is valid alias if either all backing indices have this job in _meta
68- * or there isn't any rollup job present in _meta
69- */
70- val aliasValidationResult = validateTargetIndexAlias(rollup, targetIndexResolvedName)
7174 if (rollup.isTargetIndexAlias()) {
75+ val aliasValidationResult = validateTargetIndexAlias(rollup, targetIndexResolvedName)
7276 if (aliasValidationResult !is RollupJobValidationResult .Valid ) {
7377 return aliasValidationResult
7478 } else if (! isRollupIndex(targetIndexResolvedName, clusterService.state())) {
@@ -84,20 +88,25 @@ class RollupMapperService(
8488 }
8589 }
8690
91+ /* *
92+ * Target Index is valid alias if either all backing indices have this job in _meta
93+ * or there isn't any rollup job present in _meta
94+ */
8795 @Suppress(" ReturnCount" )
8896 suspend fun validateTargetIndexAlias (rollup : Rollup , targetIndexResolvedName : String ): RollupJobValidationResult {
8997
9098 var errorMessage: String
9199
92100 if (! RollupFieldValueExpressionResolver .indexAliasUtils.hasAlias(targetIndexResolvedName)) {
101+ logger.error(" [${rollup.targetIndex} ] is not an alias!" )
93102 return RollupJobValidationResult .Failure (" [${rollup.targetIndex} ] is not an alias!" )
94103 }
95104
96105 val rollupJobs = clusterService.state().metadata.index(targetIndexResolvedName).getRollupJobs()
97106 if (rollupJobs != null &&
98107 (rollupJobs.size > 1 || rollupJobs[0 ].id != rollup.id)
99108 ) {
100- errorMessage = " If target_index is alias, write backing index must be used only by this rollup job : [$targetIndexResolvedName ]"
109+ errorMessage = " More than one rollup job present on the backing index, cannot add alias for target index : [$targetIndexResolvedName ]"
101110 logger.error(errorMessage)
102111 return RollupJobValidationResult .Failure (errorMessage)
103112 }
@@ -106,8 +115,8 @@ class RollupMapperService(
106115 val backingIndices = RollupFieldValueExpressionResolver .indexAliasUtils.getBackingIndicesForAlias(rollup.targetIndex)
107116 backingIndices?.forEach {
108117 if (it.index.name != targetIndexResolvedName) {
109- val rollupJobs = clusterService.state().metadata.index(it.index.name).getRollupJobs()
110- val validationResult = validateNonWriteBackingIndex(it.index.name, rollup, rollupJobs )
118+ val allRollupJobs = clusterService.state().metadata.index(it.index.name).getRollupJobs()
119+ val validationResult = validateNonWriteBackingIndex(it.index.name, rollup, allRollupJobs )
111120 if (validationResult !is RollupJobValidationResult .Valid ) {
112121 return validationResult
113122 }
@@ -139,7 +148,12 @@ class RollupMapperService(
139148 suspend fun attemptCreateRollupTargetIndex (job : Rollup , hasLegacyPlugin : Boolean ): RollupJobValidationResult {
140149 val targetIndexResolvedName = RollupFieldValueExpressionResolver .resolve(job, job.targetIndex)
141150 if (indexExists(targetIndexResolvedName)) {
142- return validateAndAttemptToUpdateTargetIndex(job, targetIndexResolvedName, hasLegacyPlugin)
151+ val validationResult = validateAndAttemptToUpdateTargetIndex(job, targetIndexResolvedName, hasLegacyPlugin)
152+ when (validationResult) {
153+ is RollupJobValidationResult .Failure -> logger.error(validationResult.message)
154+ is RollupJobValidationResult .Invalid -> logger.error(validationResult.reason)
155+ }
156+ return validationResult
143157 } else {
144158 val errorMessage = " Failed to create target index [$targetIndexResolvedName ]"
145159 return try {
@@ -180,10 +194,11 @@ class RollupMapperService(
180194 try {
181195 // 1. First we need to add index.plugins.rollup_index setting to index
182196 if (addRollupSettingToIndex(targetIndexResolvedName, hasLegacyPlugin) == false ) {
197+ logger.error(" Failed to update rollup settings for target index: [$targetIndexResolvedName ]" )
183198 return RollupJobValidationResult .Invalid (" Failed to update rollup settings for target index: [$targetIndexResolvedName ]" )
184199 }
185200
186- // 2. Put rollup mappings
201+ // 2. Put rollup target_index mappings
187202 val putMappingRequest: PutMappingRequest =
188203 PutMappingRequest (targetIndexResolvedName).source(IndexManagementIndices .rollupTargetMappings, XContentType .JSON )
189204 val respMappings: AcknowledgedResponse = client.admin().indices().suspendUntil {
@@ -192,7 +207,7 @@ class RollupMapperService(
192207 if (! respMappings.isAcknowledged) {
193208 return RollupJobValidationResult .Invalid (" Failed to put initial rollup mappings for target index [$targetIndexResolvedName ]" )
194209 }
195- // 3.
210+ // 3. Add this rollup job to target_index's _meta
196211 errorMessage = " Failed to update mappings for target index [$targetIndexResolvedName ]"
197212 updateRollupIndexMappings(rollup, targetIndexResolvedName)
198213 } catch (e: RemoteTransportException ) {
0 commit comments