Skip to content

Commit 23b3e9d

Browse files
authored
Choose the rollup settings to use based on cluster configuration (opensearch-project#91)
1 parent 4fab91a commit 23b3e9d

File tree

4 files changed

+44
-15
lines changed

4 files changed

+44
-15
lines changed

src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,13 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
299299
): Collection<Any> {
300300
val settings = environment.settings()
301301
this.clusterService = clusterService
302+
rollupInterceptor = RollupInterceptor(clusterService, settings, indexNameExpressionResolver)
303+
val jvmService = JvmService(environment.settings())
304+
val transformRunner = TransformRunner.initialize(client, clusterService, xContentRegistry, settings, indexNameExpressionResolver, jvmService)
305+
fieldCapsFilter = FieldCapsFilter(clusterService, settings, indexNameExpressionResolver)
306+
this.indexNameExpressionResolver = indexNameExpressionResolver
307+
308+
val skipFlag = SkipExecution(client, clusterService)
302309
val rollupRunner = RollupRunner
303310
.registerClient(client)
304311
.registerClusterService(clusterService)
@@ -311,13 +318,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
311318
.registerSearcher(RollupSearchService(settings, clusterService, client))
312319
.registerMetadataServices(RollupMetadataService(client, xContentRegistry))
313320
.registerConsumers()
314-
rollupInterceptor = RollupInterceptor(clusterService, settings, indexNameExpressionResolver)
315-
val jvmService = JvmService(environment.settings())
316-
val transformRunner = TransformRunner.initialize(client, clusterService, xContentRegistry, settings, indexNameExpressionResolver, jvmService)
317-
fieldCapsFilter = FieldCapsFilter(clusterService, settings, indexNameExpressionResolver)
318-
this.indexNameExpressionResolver = indexNameExpressionResolver
319-
320-
val skipFlag = SkipExecution(client, clusterService)
321+
.registerClusterConfigurationProvider(skipFlag)
321322
indexManagementIndices = IndexManagementIndices(settings, client.admin().indices(), clusterService)
322323
val indexStateManagementHistory =
323324
IndexStateManagementHistory(

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/SkipExecution.kt

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ class SkipExecution(
4949

5050
@Volatile final var flag: Boolean = false
5151
private set
52+
// To track if there are any legacy IM plugin nodes part of the cluster
53+
@Volatile final var hasLegacyPlugin: Boolean = false
54+
private set
5255

5356
init {
5457
clusterService.addListener(this)
@@ -68,6 +71,7 @@ class SkipExecution(
6871
object : ActionListener<NodesInfoResponse> {
6972
override fun onResponse(response: NodesInfoResponse) {
7073
val versionSet = mutableSetOf<String>()
74+
val legacyVersionSet = mutableSetOf<String>()
7175

7276
response.nodes.map { it.getInfo(PluginsAndModules::class.java).pluginInfos }
7377
.forEach {
@@ -77,13 +81,24 @@ class SkipExecution(
7781
) {
7882
versionSet.add(nodePlugin.version)
7983
}
84+
85+
if (nodePlugin.name == "opendistro-index-management" ||
86+
nodePlugin.name == "opendistro_index_management"
87+
) {
88+
legacyVersionSet.add(nodePlugin.version)
89+
}
8090
}
8191
}
8292

83-
if (versionSet.size > 1) {
93+
if ((versionSet.size + legacyVersionSet.size) > 1) {
8494
flag = true
85-
logger.info("There are multiple versions of Index Management plugins in the cluster: $versionSet")
95+
logger.info("There are multiple versions of Index Management plugins in the cluster: [$versionSet, $legacyVersionSet]")
8696
} else flag = false
97+
98+
if (versionSet.isNotEmpty() && legacyVersionSet.isNotEmpty()) {
99+
hasLegacyPlugin = true
100+
logger.info("Found legacy plugin versions [$legacyVersionSet] and opensearch plugins versions [$versionSet] in the cluster")
101+
} else hasLegacyPlugin = false
87102
}
88103

89104
override fun onFailure(e: Exception) {

src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import org.opensearch.indexmanagement.rollup.action.mapping.UpdateRollupMappingA
5151
import org.opensearch.indexmanagement.rollup.action.mapping.UpdateRollupMappingRequest
5252
import org.opensearch.indexmanagement.rollup.model.Rollup
5353
import org.opensearch.indexmanagement.rollup.model.RollupJobValidationResult
54+
import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings
5455
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
5556
import org.opensearch.indexmanagement.rollup.util.isRollupIndex
5657
import org.opensearch.indexmanagement.util.IndexUtils.Companion._META
@@ -89,13 +90,13 @@ class RollupMapperService(
8990
// If the target index mappings doesn't contain rollup job attempts to update the mappings.
9091
// TODO: error handling
9192
@Suppress("ReturnCount")
92-
suspend fun attemptCreateRollupTargetIndex(job: Rollup): RollupJobValidationResult {
93+
suspend fun attemptCreateRollupTargetIndex(job: Rollup, hasLegacyPlugin: Boolean): RollupJobValidationResult {
9394
if (indexExists(job.targetIndex)) {
9495
return validateAndAttemptToUpdateTargetIndex(job)
9596
} else {
9697
val errorMessage = "Failed to create target index [${job.targetIndex}]"
9798
return try {
98-
val response = createTargetIndex(job)
99+
val response = createTargetIndex(job, hasLegacyPlugin)
99100
if (response.isAcknowledged) {
100101
updateRollupIndexMappings(job)
101102
} else {
@@ -112,9 +113,14 @@ class RollupMapperService(
112113
}
113114
}
114115

115-
private suspend fun createTargetIndex(job: Rollup): CreateIndexResponse {
116+
private suspend fun createTargetIndex(job: Rollup, hasLegacyPlugin: Boolean): CreateIndexResponse {
117+
val settings = if (hasLegacyPlugin) {
118+
Settings.builder().put(LegacyOpenDistroRollupSettings.ROLLUP_INDEX.key, true).build()
119+
} else {
120+
Settings.builder().put(RollupSettings.ROLLUP_INDEX.key, true).build()
121+
}
116122
val request = CreateIndexRequest(job.targetIndex)
117-
.settings(Settings.builder().put(RollupSettings.ROLLUP_INDEX.key, true).build())
123+
.settings(settings)
118124
.mapping(_DOC, IndexManagementIndices.rollupTargetMappings, XContentType.JSON)
119125
// TODO: Perhaps we can do better than this for mappings... as it'll be dynamic for rest
120126
// Can we read in the actual mappings from the source index and use that?

src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupRunner.kt

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import org.opensearch.cluster.service.ClusterService
4040
import org.opensearch.common.settings.Settings
4141
import org.opensearch.common.unit.TimeValue
4242
import org.opensearch.common.xcontent.NamedXContentRegistry
43+
import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution
4344
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
4445
import org.opensearch.indexmanagement.rollup.action.get.GetRollupAction
4546
import org.opensearch.indexmanagement.rollup.action.get.GetRollupRequest
@@ -86,6 +87,7 @@ object RollupRunner :
8687
private lateinit var rollupIndexer: RollupIndexer
8788
private lateinit var rollupSearchService: RollupSearchService
8889
private lateinit var rollupMetadataService: RollupMetadataService
90+
private lateinit var clusterConfigurationProvider: SkipExecution
8991

9092
fun registerClusterService(clusterService: ClusterService): RollupRunner {
9193
this.clusterService = clusterService
@@ -139,6 +141,11 @@ object RollupRunner :
139141
return this
140142
}
141143

144+
fun registerClusterConfigurationProvider(clusterConfigurationProvider: SkipExecution): RollupRunner {
145+
this.clusterConfigurationProvider = clusterConfigurationProvider
146+
return this
147+
}
148+
142149
fun registerConsumers(): RollupRunner {
143150
return this
144151
}
@@ -250,7 +257,7 @@ object RollupRunner :
250257
}
251258
}
252259

253-
when (val result = rollupMapperService.attemptCreateRollupTargetIndex(updatableJob)) {
260+
when (val result = rollupMapperService.attemptCreateRollupTargetIndex(updatableJob, clusterConfigurationProvider.hasLegacyPlugin)) {
254261
is RollupJobValidationResult.Failure -> {
255262
setFailedMetadataAndDisableJob(updatableJob, result.message, metadata)
256263
return
@@ -390,7 +397,7 @@ object RollupRunner :
390397
// we validate target index only if there is metadata document in the rollup
391398
if (metadata != null) {
392399
logger.debug("Attempting to create/validate target index [${job.targetIndex}] for rollup job [${job.id}]")
393-
return rollupMapperService.attemptCreateRollupTargetIndex(job)
400+
return rollupMapperService.attemptCreateRollupTargetIndex(job, clusterConfigurationProvider.hasLegacyPlugin)
394401
}
395402

396403
return RollupJobValidationResult.Valid

0 commit comments

Comments
 (0)