Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementH
import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinator
import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner
import org.opensearch.indexmanagement.indexstatemanagement.MetadataService
import org.opensearch.indexmanagement.indexstatemanagement.PluginVersionSweepCoordinator
import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
Expand Down Expand Up @@ -370,7 +371,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
fieldCapsFilter = FieldCapsFilter(clusterService, settings, indexNameExpressionResolver)
this.indexNameExpressionResolver = indexNameExpressionResolver

val skipFlag = SkipExecution(client, clusterService)
val skipFlag = SkipExecution(client)
RollupFieldValueExpressionResolver.registerScriptService(scriptService)
val rollupRunner = RollupRunner
.registerClient(client)
Expand Down Expand Up @@ -428,6 +429,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin

val smRunner = SMRunner.init(client, threadPool, settings, indexManagementIndices, clusterService)

val pluginVersionSweepCoordinator = PluginVersionSweepCoordinator(skipFlag, settings, threadPool, clusterService)

return listOf(
managedIndexRunner,
rollupRunner,
Expand All @@ -436,7 +439,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
managedIndexCoordinator,
indexStateManagementHistory,
indexMetadataProvider,
smRunner
smRunner,
pluginVersionSweepCoordinator
)
}

Expand All @@ -461,6 +465,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ManagedIndexSettings.JITTER,
ManagedIndexSettings.JOB_INTERVAL,
ManagedIndexSettings.SWEEP_PERIOD,
ManagedIndexSettings.SWEEP_SKIP_PERIOD,
ManagedIndexSettings.COORDINATOR_BACKOFF_COUNT,
ManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS,
ManagedIndexSettings.ALLOW_LIST,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement

import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.cluster.ClusterChangedEvent
import org.opensearch.cluster.ClusterStateListener
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.component.LifecycleListener
import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import org.opensearch.indexmanagement.util.OpenForTesting
import org.opensearch.threadpool.Scheduler
import org.opensearch.threadpool.ThreadPool

class PluginVersionSweepCoordinator(
private val skipExecution: SkipExecution,
settings: Settings,
private val threadPool: ThreadPool,
clusterService: ClusterService,
) : CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ISMPluginSweepCoordinator")),
LifecycleListener(),
ClusterStateListener {
private val logger = LogManager.getLogger(javaClass)

private var scheduledSkipExecution: Scheduler.Cancellable? = null

@Volatile
private var sweepSkipPeriod = ManagedIndexSettings.SWEEP_SKIP_PERIOD.get(settings)

@Volatile
private var indexStateManagementEnabled = ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED.get(settings)

init {
clusterService.addLifecycleListener(this)
clusterService.addListener(this)
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.SWEEP_SKIP_PERIOD) {
sweepSkipPeriod = it
initBackgroundSweepISMPluginVersionExecution()
}
}

override fun afterStart() {
initBackgroundSweepISMPluginVersionExecution()
}

override fun beforeStop() {
scheduledSkipExecution?.cancel()
}

override fun clusterChanged(event: ClusterChangedEvent) {
if (event.nodesChanged() || event.isNewCluster) {
skipExecution.sweepISMPluginVersion()
initBackgroundSweepISMPluginVersionExecution()
}
}

@OpenForTesting
fun initBackgroundSweepISMPluginVersionExecution() {
// If ISM is disabled return early
if (!isIndexStateManagementEnabled()) return
// Cancel existing background sweep
scheduledSkipExecution?.cancel()
val scheduledJob = Runnable {
launch {
try {
if (!skipExecution.flag) {
logger.info("Canceling sweep ism plugin version job")
scheduledSkipExecution?.cancel()
} else {
skipExecution.sweepISMPluginVersion()
}
} catch (e: Exception) {
logger.error("Failed to sweep ism plugin version", e)
}
}
}
scheduledSkipExecution =
threadPool.scheduleWithFixedDelay(scheduledJob, sweepSkipPeriod, ThreadPool.Names.MANAGEMENT)
}

private fun isIndexStateManagementEnabled(): Boolean = indexStateManagementEnabled == true
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,25 @@ import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules
import org.opensearch.client.Client
import org.opensearch.cluster.ClusterChangedEvent
import org.opensearch.cluster.ClusterStateListener
import org.opensearch.cluster.service.ClusterService
import org.opensearch.indexmanagement.util.OpenForTesting

// TODO this can be moved to job scheduler, so that all extended plugin
// can avoid running jobs in an upgrading cluster
@OpenForTesting
class SkipExecution(
private val client: Client,
private val clusterService: ClusterService
) : ClusterStateListener {
private val client: Client
) {
private val logger = LogManager.getLogger(javaClass)

@Volatile final var flag: Boolean = false
@Volatile
final var flag: Boolean = false
private set

// To track if there are any legacy IM plugin nodes part of the cluster
@Volatile final var hasLegacyPlugin: Boolean = false
@Volatile
final var hasLegacyPlugin: Boolean = false
private set

init {
clusterService.addListener(this)
}

override fun clusterChanged(event: ClusterChangedEvent) {
if (event.nodesChanged() || event.isNewCluster) {
sweepISMPluginVersion()
}
}

fun sweepISMPluginVersion() {
// if old version ISM plugin exists (2 versions ISM in one cluster), set skip flag to true
val request = NodesInfoRequest().clear().addMetric("plugins")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ class ManagedIndexSettings {
Setting.Property.Dynamic
)

val SWEEP_SKIP_PERIOD: Setting<TimeValue> = Setting.timeSetting(
"plugins.index_state_management.coordinator.sweep_skip_period",
TimeValue.timeValueMinutes(5),
Setting.Property.NodeScope,
Setting.Property.Dynamic
)

val COORDINATOR_BACKOFF_MILLIS: Setting<TimeValue> = Setting.positiveTimeSetting(
"plugins.index_state_management.coordinator.backoff_millis",
LegacyOpenDistroManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class MetadataServiceTests : OpenSearchTestCase() {
)
)
)
val skipFlag = SkipExecution(client, clusterService)
val skipFlag = SkipExecution(client)
val metadataService = MetadataService(client, clusterService, skipFlag, imIndices)
metadataService.moveMetadata()

Expand All @@ -75,7 +75,7 @@ class MetadataServiceTests : OpenSearchTestCase() {
)
)

val skipFlag = SkipExecution(client, clusterService)
val skipFlag = SkipExecution(client)
val metadataService = MetadataService(client, clusterService, skipFlag, imIndices)
metadataService.moveMetadata()
assertEquals(metadataService.runTimeCounter, 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,24 @@ import org.opensearch.action.admin.cluster.node.info.NodesInfoAction
import org.opensearch.client.Client
import org.opensearch.cluster.ClusterChangedEvent
import org.opensearch.cluster.OpenSearchAllocationTestCase
import org.opensearch.cluster.service.ClusterService
import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution

class SkipExecutionTests : OpenSearchAllocationTestCase() {

private lateinit var client: Client
private lateinit var clusterService: ClusterService
private lateinit var skip: SkipExecution

@Before
@Throws(Exception::class)
fun setup() {
client = Mockito.mock(Client::class.java)
clusterService = Mockito.mock(ClusterService::class.java)
skip = SkipExecution(client, clusterService)
skip = SkipExecution(client)
}

fun `test cluster change event`() {
val event = Mockito.mock(ClusterChangedEvent::class.java)
Mockito.`when`(event.nodesChanged()).thenReturn(true)
skip.clusterChanged(event)
skip.sweepISMPluginVersion()
Mockito.verify(client).execute(Mockito.eq(NodesInfoAction.INSTANCE), Mockito.any(), Mockito.any())
}
}