Skip to content

Commit 40174cc

Browse files
committed
Remove the old stale cluster state ism metadata logic
Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
1 parent a2dd769 commit 40174cc

File tree

18 files changed

+35
-514
lines changed

18 files changed

+35
-514
lines changed

src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.remo
8282
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.removepolicy.TransportRemovePolicyAction
8383
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.retryfailedmanagedindex.RetryFailedManagedIndexAction
8484
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.retryfailedmanagedindex.TransportRetryFailedManagedIndexAction
85-
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.TransportUpdateManagedIndexMetaDataAction
86-
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction
8785
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE
8886
import org.opensearch.indexmanagement.indexstatemanagement.validation.ActionValidation
8987
import org.opensearch.indexmanagement.refreshanalyzer.RefreshSearchAnalyzerAction
@@ -500,10 +498,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
500498
ManagedIndexSettings.ROLLOVER_SKIP,
501499
ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED,
502500
ManagedIndexSettings.ACTION_VALIDATION_ENABLED,
503-
ManagedIndexSettings.METADATA_SERVICE_ENABLED,
504501
ManagedIndexSettings.AUTO_MANAGE,
505-
ManagedIndexSettings.METADATA_SERVICE_STATUS,
506-
ManagedIndexSettings.TEMPLATE_MIGRATION_CONTROL,
507502
ManagedIndexSettings.JITTER,
508503
ManagedIndexSettings.JOB_INTERVAL,
509504
ManagedIndexSettings.SWEEP_PERIOD,
@@ -540,16 +535,13 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
540535
LegacyOpenDistroManagedIndexSettings.ROLLOVER_ALIAS,
541536
LegacyOpenDistroManagedIndexSettings.ROLLOVER_SKIP,
542537
LegacyOpenDistroManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED,
543-
LegacyOpenDistroManagedIndexSettings.METADATA_SERVICE_ENABLED,
544538
LegacyOpenDistroManagedIndexSettings.JOB_INTERVAL,
545539
LegacyOpenDistroManagedIndexSettings.SWEEP_PERIOD,
546540
LegacyOpenDistroManagedIndexSettings.COORDINATOR_BACKOFF_COUNT,
547541
LegacyOpenDistroManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS,
548542
LegacyOpenDistroManagedIndexSettings.ALLOW_LIST,
549543
LegacyOpenDistroManagedIndexSettings.SNAPSHOT_DENY_LIST,
550544
LegacyOpenDistroManagedIndexSettings.AUTO_MANAGE,
551-
LegacyOpenDistroManagedIndexSettings.METADATA_SERVICE_STATUS,
552-
LegacyOpenDistroManagedIndexSettings.TEMPLATE_MIGRATION_CONTROL,
553545
LegacyOpenDistroManagedIndexSettings.RESTRICTED_INDEX_PATTERN,
554546
LegacyOpenDistroRollupSettings.ROLLUP_INGEST_BACKOFF_COUNT,
555547
LegacyOpenDistroRollupSettings.ROLLUP_INGEST_BACKOFF_MILLIS,
@@ -565,7 +557,6 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
565557

566558
override fun getActions(): List<ActionPlugin.ActionHandler<out ActionRequest, out ActionResponse>> {
567559
return listOf(
568-
ActionPlugin.ActionHandler(UpdateManagedIndexMetaDataAction.INSTANCE, TransportUpdateManagedIndexMetaDataAction::class.java),
569560
ActionPlugin.ActionHandler(RemovePolicyAction.INSTANCE, TransportRemovePolicyAction::class.java),
570561
ActionPlugin.ActionHandler(RefreshSearchAnalyzerAction.INSTANCE, TransportRefreshSearchAnalyzerAction::class.java),
571562
ActionPlugin.ActionHandler(AddPolicyAction.INSTANCE, TransportAddPolicyAction::class.java),

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,6 @@ class ManagedIndexCoordinator(
119119
private val ismIndices = indexManagementIndices
120120

121121
private var scheduledFullSweep: Scheduler.Cancellable? = null
122-
private var scheduledMoveMetadata: Scheduler.Cancellable? = null
123-
private var scheduledTemplateMigration: Scheduler.Cancellable? = null
124122

125123
@Volatile private var lastFullSweepTimeNano = System.nanoTime()
126124
@Volatile private var indexStateManagementEnabled = INDEX_STATE_MANAGEMENT_ENABLED.get(settings)
@@ -170,10 +168,6 @@ class ManagedIndexCoordinator(
170168
fun offClusterManager() {
171169
// Cancel background sweep when demoted from being cluster manager
172170
scheduledFullSweep?.cancel()
173-
174-
scheduledMoveMetadata?.cancel()
175-
176-
scheduledTemplateMigration?.cancel()
177171
}
178172

179173
override fun clusterChanged(event: ClusterChangedEvent) {
@@ -206,8 +200,6 @@ class ManagedIndexCoordinator(
206200

207201
override fun beforeStop() {
208202
scheduledFullSweep?.cancel()
209-
210-
scheduledMoveMetadata?.cancel()
211203
}
212204

213205
private fun enable() {
@@ -229,8 +221,6 @@ class ManagedIndexCoordinator(
229221
private fun disable() {
230222
scheduledFullSweep?.cancel()
231223
indexStateManagementEnabled = false
232-
233-
scheduledMoveMetadata?.cancel()
234224
}
235225

236226
private suspend fun reenableJobs() {

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,12 @@ import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndex
5656
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.JOB_INTERVAL
5757
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.ACTION_VALIDATION_ENABLED
5858
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE
59-
import org.opensearch.indexmanagement.indexstatemanagement.util.MetadataCheck
60-
import org.opensearch.indexmanagement.indexstatemanagement.util.checkMetadata
6159
import org.opensearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexMetadataRequest
6260
import org.opensearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexRequest
6361
import org.opensearch.indexmanagement.indexstatemanagement.util.getCompletedManagedIndexMetaData
6462
import org.opensearch.indexmanagement.indexstatemanagement.util.getStartingManagedIndexMetaData
6563
import org.opensearch.indexmanagement.indexstatemanagement.util.hasDifferentJobInterval
6664
import org.opensearch.indexmanagement.indexstatemanagement.util.hasTimedOut
67-
import org.opensearch.indexmanagement.indexstatemanagement.util.hasVersionConflict
6865
import org.opensearch.indexmanagement.indexstatemanagement.util.isAllowed
6966
import org.opensearch.indexmanagement.indexstatemanagement.util.isFailed
7067
import org.opensearch.indexmanagement.indexstatemanagement.util.isSafeToChange
@@ -98,6 +95,7 @@ import org.opensearch.jobscheduler.spi.ScheduledJobParameter
9895
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
9996
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
10097
import org.opensearch.core.rest.RestStatus
98+
import org.opensearch.indexmanagement.indexstatemanagement.util.hasVersionConflict
10199
import org.opensearch.script.Script
102100
import org.opensearch.script.ScriptService
103101
import org.opensearch.script.TemplateScript
@@ -281,13 +279,6 @@ object ManagedIndexRunner :
281279
logger.warn("Failed to find IndexMetadata for ${managedIndexConfig.index}.")
282280
return
283281
}
284-
} else {
285-
val clusterStateMetadata = clusterStateIndexMetadata.getManagedIndexMetadata()
286-
val metadataCheck = checkMetadata(clusterStateMetadata, managedIndexMetaData, managedIndexConfig.indexUuid, logger)
287-
if (metadataCheck != MetadataCheck.SUCCESS) {
288-
logger.info("Skipping execution while metadata status is $metadataCheck")
289-
return
290-
}
291282
}
292283

293284
// If policy or managedIndexMetaData is null then initialize

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,6 @@ fun IndexMetadata.getRolloverSkip(): Boolean {
6161
return this.settings.getAsBoolean(ManagedIndexSettings.ROLLOVER_SKIP.key, false)
6262
}
6363

64-
fun IndexMetadata.getManagedIndexMetadata(): ManagedIndexMetaData? {
65-
val existingMetaDataMap = this.getCustomData(ManagedIndexMetaData.MANAGED_INDEX_METADATA_TYPE)
66-
67-
if (existingMetaDataMap != null) {
68-
return ManagedIndexMetaData.fromMap(existingMetaDataMap)
69-
}
70-
return null
71-
}
72-
7364
fun getUuidsForClosedIndices(state: ClusterState, defaultIndexMetadataService: DefaultIndexMetadataService): MutableList<String> {
7465
val indexMetadatas = state.metadata.indices
7566
val closeList = mutableListOf<String>()

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/LegacyOpenDistroManagedIndexSettings.kt

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ import java.util.function.Function
1515
class LegacyOpenDistroManagedIndexSettings {
1616
companion object {
1717
const val DEFAULT_ISM_ENABLED = true
18-
const val DEFAULT_METADATA_SERVICE_STATUS = 0
19-
const val DEFAULT_METADATA_SERVICE_ENABLED = true
2018
const val DEFAULT_JOB_INTERVAL = 5
2119
private val ALLOW_LIST_ALL = ISMActionsParser.instance.parsers.map { it.getActionType() }.toList()
2220
val ALLOW_LIST_NONE = emptyList<String>()
@@ -30,36 +28,6 @@ class LegacyOpenDistroManagedIndexSettings {
3028
Setting.Property.Deprecated
3129
)
3230

33-
// 0: migration is going on
34-
// 1: migration succeed
35-
// -1: migration failed
36-
val METADATA_SERVICE_STATUS: Setting<Int> = Setting.intSetting(
37-
"opendistro.index_state_management.metadata_migration.status",
38-
DEFAULT_METADATA_SERVICE_STATUS,
39-
Setting.Property.NodeScope,
40-
Setting.Property.Dynamic
41-
)
42-
43-
// 0: enabled, use onClusterManager time as ISM template last_updated_time
44-
// -1: migration ended successfully
45-
// -2: migration ended unsuccessfully
46-
// >0: use this setting (epoch millis) as ISM template last_updated_time
47-
val TEMPLATE_MIGRATION_CONTROL: Setting<Long> = Setting.longSetting(
48-
"opendistro.index_state_management.template_migration.control",
49-
ManagedIndexSettings.DEFAULT_TEMPLATE_MIGRATION_TIMESTAMP,
50-
-2L,
51-
Setting.Property.NodeScope,
52-
Setting.Property.Dynamic
53-
)
54-
55-
val METADATA_SERVICE_ENABLED: Setting<Boolean> = Setting.boolSetting(
56-
"opendistro.index_state_management.metadata_service.enabled",
57-
DEFAULT_METADATA_SERVICE_ENABLED,
58-
Setting.Property.NodeScope,
59-
Setting.Property.Dynamic,
60-
Setting.Property.Deprecated
61-
)
62-
6331
val POLICY_ID: Setting<String> = Setting.simpleString(
6432
"index.opendistro.index_state_management.policy_id",
6533
Setting.Property.IndexScope,

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ class ManagedIndexSettings {
1515
companion object {
1616
const val DEFAULT_ISM_ENABLED = true
1717
const val DEFAULT_ACTION_VALIDATION_ENABLED = false
18-
const val DEFAULT_TEMPLATE_MIGRATION_TIMESTAMP = 0L
1918
const val DEFAULT_JOB_INTERVAL = 5
2019
const val DEFAULT_JITTER = 0.6
2120
const val DEFAULT_RESTRICTED_PATTERN = "\\.opendistro_security|\\.kibana.*|\\$INDEX_MANAGEMENT_INDEX"
@@ -36,35 +35,6 @@ class ManagedIndexSettings {
3635
Setting.Property.Dynamic
3736
)
3837

39-
// 0: migration is going on
40-
// 1: migration succeed
41-
// -1: migration failed
42-
val METADATA_SERVICE_STATUS: Setting<Int> = Setting.intSetting(
43-
"plugins.index_state_management.metadata_migration.status",
44-
LegacyOpenDistroManagedIndexSettings.METADATA_SERVICE_STATUS,
45-
Setting.Property.NodeScope,
46-
Setting.Property.Dynamic
47-
)
48-
49-
// 0: enabled, use onClusterManager time as ISM template last_updated_time
50-
// -1: migration ended successfully
51-
// -2: migration ended unsuccessfully
52-
// >0: use this setting (epoch millis) as ISM template last_updated_time
53-
val TEMPLATE_MIGRATION_CONTROL: Setting<Long> = Setting.longSetting(
54-
"plugins.index_state_management.template_migration.control",
55-
LegacyOpenDistroManagedIndexSettings.TEMPLATE_MIGRATION_CONTROL,
56-
-2L,
57-
Setting.Property.NodeScope,
58-
Setting.Property.Dynamic
59-
)
60-
61-
val METADATA_SERVICE_ENABLED: Setting<Boolean> = Setting.boolSetting(
62-
"plugins.index_state_management.metadata_service.enabled",
63-
LegacyOpenDistroManagedIndexSettings.METADATA_SERVICE_ENABLED,
64-
Setting.Property.NodeScope,
65-
Setting.Property.Dynamic
66-
)
67-
6838
val POLICY_ID: Setting<String> = Setting.simpleString(
6939
"index.plugins.index_state_management.policy_id",
7040
LegacyOpenDistroManagedIndexSettings.POLICY_ID,

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ import org.opensearch.common.settings.Settings
3434
import org.opensearch.common.unit.TimeValue
3535
import org.opensearch.commons.ConfigConstants
3636
import org.opensearch.commons.authuser.User
37-
import org.opensearch.core.xcontent.NamedXContentRegistry
3837
import org.opensearch.core.index.Index
38+
import org.opensearch.core.xcontent.NamedXContentRegistry
3939
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
4040
import org.opensearch.indexmanagement.indexstatemanagement.DefaultIndexMetadataService
4141
import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider
@@ -49,7 +49,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.mana
4949
import org.opensearch.indexmanagement.indexstatemanagement.util.DEFAULT_INDEX_TYPE
5050
import org.opensearch.indexmanagement.indexstatemanagement.util.FailedIndex
5151
import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexConfigIndexRequest
52-
import org.opensearch.indexmanagement.indexstatemanagement.util.removeClusterStateMetadatas
5352
import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext
5453
import org.opensearch.indexmanagement.opensearchapi.parseFromGetResponse
5554
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
@@ -61,6 +60,7 @@ import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser
6160
import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermissionForResource
6261
import org.opensearch.indexmanagement.util.SecurityUtils.Companion.validateUserConfiguration
6362
import org.opensearch.core.rest.RestStatus
63+
import org.opensearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexMetadataRequest
6464
import org.opensearch.tasks.Task
6565
import org.opensearch.transport.TransportService
6666
import java.time.Duration
@@ -126,7 +126,7 @@ class TransportAddPolicyAction @Inject constructor(
126126
}
127127

128128
@Suppress("SpreadOperator")
129-
fun getClusterState() {
129+
private fun getClusterState() {
130130
startTime = Instant.now()
131131
CoroutineScope(Dispatchers.IO).launch {
132132
val indexNameToMetadata: MutableMap<String, ISMIndexMetadata> = HashMap()
@@ -193,10 +193,6 @@ class TransportAddPolicyAction @Inject constructor(
193193
clusterStateRequest,
194194
object : ActionListener<ClusterStateResponse> {
195195
override fun onResponse(response: ClusterStateResponse) {
196-
CoroutineScope(Dispatchers.IO).launch {
197-
removeClusterStateMetadatas(client, log, indicesToAdd.map { Index(it.value, it.key) })
198-
}
199-
200196
val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService
201197
getUuidsForClosedIndices(response.state, defaultIndexMetadataService).forEach {
202198
failedIndices.add(FailedIndex(indicesToAdd[it] as String, it, "This index is closed"))
@@ -346,6 +342,9 @@ class TransportAddPolicyAction @Inject constructor(
346342
}
347343
}
348344
actionListener.onResponse(ISMStatusResponse(indicesToAdd.size, failedIndices))
345+
346+
// best effort to clean up ISM metadata
347+
removeMetadatas(indicesToAdd.map { Index(it.value, it.key) })
349348
}
350349

351350
override fun onFailure(t: Exception) {
@@ -368,6 +367,23 @@ class TransportAddPolicyAction @Inject constructor(
368367
private fun onFailure(t: Exception) {
369368
actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception)
370369
}
370+
371+
fun removeMetadatas(indices: List<Index>) {
372+
val request = indices.map { deleteManagedIndexMetadataRequest(it.uuid) }
373+
val bulkReq = BulkRequest().add(request)
374+
client.bulk(
375+
bulkReq,
376+
object : ActionListener<BulkResponse> {
377+
override fun onResponse(response: BulkResponse) {
378+
log.debug("Successfully cleaned metadata for remove policy indices: {}", indices)
379+
}
380+
381+
override fun onFailure(e: Exception) {
382+
log.error("Failed to clean metadata for remove policy indices.", e)
383+
}
384+
}
385+
)
386+
}
371387
}
372388

373389
companion object {

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexCon
4141
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
4242
import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.SweptManagedIndexConfig
4343
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.buildMgetMetadataRequest
44-
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getManagedIndexMetadata
4544
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.mgetResponseToMap
4645
import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestChangePolicyAction
4746
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.ISMStatusResponse
@@ -268,9 +267,6 @@ class TransportChangePolicyAction @Inject constructor(
268267
val includedStates = changePolicy.include.map { it.state }.toSet()
269268

270269
indicesToUpdate.forEach { (indexUuid, indexName) ->
271-
// indexMetaData and clusterStateMetadata will be null for non-default index types
272-
val indexMetaData = indexUuidToIndexMetadata[indexUuid]
273-
val clusterStateMetadata = indexMetaData?.getManagedIndexMetadata()
274270
val mgetFailure = metadataMap[indexUuid]?.second
275271
val managedIndexMetadata: ManagedIndexMetaData? = metadataMap[managedIndexMetadataID(indexUuid)]?.first
276272

@@ -296,25 +292,16 @@ class TransportChangePolicyAction @Inject constructor(
296292
RestChangePolicyAction.INDEX_IN_TRANSITION
297293
)
298294
)
299-
// else if there is no ManagedIndexMetaData yet then the managed index has not initialized and we can change the policy safely
295+
// else if there is no ManagedIndexMetaData yet then the managed index has not initialized, and we can change the policy safely
300296
managedIndexMetadata == null -> {
301-
if (clusterStateMetadata != null) {
302-
failedIndices.add(
303-
FailedIndex(
304-
indexName, indexUuid,
305-
"Cannot change policy until metadata has finished migrating"
306-
)
307-
)
308-
} else {
309-
managedIndicesToUpdate.add(indexName to indexUuid)
310-
}
297+
managedIndicesToUpdate.add(indexName to indexUuid)
311298
}
312299
// else if the includedStates is empty (i.e. not being used) then we will always try to update the managed index
313300
includedStates.isEmpty() -> managedIndicesToUpdate.add(indexName to indexUuid)
314301
// else only update the managed index if its currently in one of the included states
315302
includedStates.contains(managedIndexMetadata.stateMetaData?.name) ->
316303
managedIndicesToUpdate.add(indexName to indexUuid)
317-
// else the managed index did not match any of the included state filters and we will not update it
304+
// else the managed index did not match any of the included state filters, and we will not update it
318305
else -> log.debug("Skipping $indexName as it does not match any of the include state filters")
319306
}
320307
}

0 commit comments

Comments
 (0)