Skip to content

Commit b3f7373

Browse files
Adds plugin version sweep background job (#434) (#539)
* [207]: Added 5 min scheduled job for sweeping ISM plugin version in the case of version discrepancy Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com> * [207]: Created pluginVersionSweepCoordinator component responsible for scheduling the skip execution task. Annotated tests in order to prevent thread leak error during integrational tests Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com> * [207]: Increased retry period for background job that sets the skip flag up to 5 mins Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com> * Empty-Commit Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com> Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com> Co-authored-by: Stevan Buzejic <buzejic.stevan@gmail.com> (cherry picked from commit 4d844fa) Co-authored-by: Clay Downs <downsrob@amazon.com>
1 parent 9e09d2e commit b3f7373

File tree

6 files changed

+116
-27
lines changed

6 files changed

+116
-27
lines changed

src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementH
3737
import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinator
3838
import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner
3939
import org.opensearch.indexmanagement.indexstatemanagement.MetadataService
40+
import org.opensearch.indexmanagement.indexstatemanagement.PluginVersionSweepCoordinator
4041
import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution
4142
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
4243
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
@@ -370,7 +371,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
370371
fieldCapsFilter = FieldCapsFilter(clusterService, settings, indexNameExpressionResolver)
371372
this.indexNameExpressionResolver = indexNameExpressionResolver
372373

373-
val skipFlag = SkipExecution(client, clusterService)
374+
val skipFlag = SkipExecution(client)
374375
RollupFieldValueExpressionResolver.registerScriptService(scriptService)
375376
val rollupRunner = RollupRunner
376377
.registerClient(client)
@@ -428,6 +429,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
428429

429430
val smRunner = SMRunner.init(client, threadPool, settings, indexManagementIndices, clusterService)
430431

432+
val pluginVersionSweepCoordinator = PluginVersionSweepCoordinator(skipFlag, settings, threadPool, clusterService)
433+
431434
return listOf(
432435
managedIndexRunner,
433436
rollupRunner,
@@ -436,7 +439,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
436439
managedIndexCoordinator,
437440
indexStateManagementHistory,
438441
indexMetadataProvider,
439-
smRunner
442+
smRunner,
443+
pluginVersionSweepCoordinator
440444
)
441445
}
442446

@@ -461,6 +465,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
461465
ManagedIndexSettings.JITTER,
462466
ManagedIndexSettings.JOB_INTERVAL,
463467
ManagedIndexSettings.SWEEP_PERIOD,
468+
ManagedIndexSettings.SWEEP_SKIP_PERIOD,
464469
ManagedIndexSettings.COORDINATOR_BACKOFF_COUNT,
465470
ManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS,
466471
ManagedIndexSettings.ALLOW_LIST,
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.indexstatemanagement
7+
8+
import kotlinx.coroutines.CoroutineName
9+
import kotlinx.coroutines.CoroutineScope
10+
import kotlinx.coroutines.Dispatchers
11+
import kotlinx.coroutines.SupervisorJob
12+
import kotlinx.coroutines.launch
13+
import org.apache.logging.log4j.LogManager
14+
import org.opensearch.cluster.ClusterChangedEvent
15+
import org.opensearch.cluster.ClusterStateListener
16+
import org.opensearch.cluster.service.ClusterService
17+
import org.opensearch.common.component.LifecycleListener
18+
import org.opensearch.common.settings.Settings
19+
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
20+
import org.opensearch.indexmanagement.util.OpenForTesting
21+
import org.opensearch.threadpool.Scheduler
22+
import org.opensearch.threadpool.ThreadPool
23+
24+
class PluginVersionSweepCoordinator(
25+
private val skipExecution: SkipExecution,
26+
settings: Settings,
27+
private val threadPool: ThreadPool,
28+
clusterService: ClusterService,
29+
) : CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ISMPluginSweepCoordinator")),
30+
LifecycleListener(),
31+
ClusterStateListener {
32+
private val logger = LogManager.getLogger(javaClass)
33+
34+
private var scheduledSkipExecution: Scheduler.Cancellable? = null
35+
36+
@Volatile
37+
private var sweepSkipPeriod = ManagedIndexSettings.SWEEP_SKIP_PERIOD.get(settings)
38+
39+
@Volatile
40+
private var indexStateManagementEnabled = ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED.get(settings)
41+
42+
init {
43+
clusterService.addLifecycleListener(this)
44+
clusterService.addListener(this)
45+
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.SWEEP_SKIP_PERIOD) {
46+
sweepSkipPeriod = it
47+
initBackgroundSweepISMPluginVersionExecution()
48+
}
49+
}
50+
51+
override fun afterStart() {
52+
initBackgroundSweepISMPluginVersionExecution()
53+
}
54+
55+
override fun beforeStop() {
56+
scheduledSkipExecution?.cancel()
57+
}
58+
59+
override fun clusterChanged(event: ClusterChangedEvent) {
60+
if (event.nodesChanged() || event.isNewCluster) {
61+
skipExecution.sweepISMPluginVersion()
62+
initBackgroundSweepISMPluginVersionExecution()
63+
}
64+
}
65+
66+
@OpenForTesting
67+
fun initBackgroundSweepISMPluginVersionExecution() {
68+
// If ISM is disabled return early
69+
if (!isIndexStateManagementEnabled()) return
70+
// Cancel existing background sweep
71+
scheduledSkipExecution?.cancel()
72+
val scheduledJob = Runnable {
73+
launch {
74+
try {
75+
if (!skipExecution.flag) {
76+
logger.info("Canceling sweep ism plugin version job")
77+
scheduledSkipExecution?.cancel()
78+
} else {
79+
skipExecution.sweepISMPluginVersion()
80+
}
81+
} catch (e: Exception) {
82+
logger.error("Failed to sweep ism plugin version", e)
83+
}
84+
}
85+
}
86+
scheduledSkipExecution =
87+
threadPool.scheduleWithFixedDelay(scheduledJob, sweepSkipPeriod, ThreadPool.Names.MANAGEMENT)
88+
}
89+
90+
private fun isIndexStateManagementEnabled(): Boolean = indexStateManagementEnabled == true
91+
}

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/SkipExecution.kt

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,36 +12,25 @@ import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest
1212
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse
1313
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules
1414
import org.opensearch.client.Client
15-
import org.opensearch.cluster.ClusterChangedEvent
16-
import org.opensearch.cluster.ClusterStateListener
17-
import org.opensearch.cluster.service.ClusterService
1815
import org.opensearch.indexmanagement.util.OpenForTesting
1916

2017
// TODO this can be moved to job scheduler, so that all extended plugin
2118
// can avoid running jobs in an upgrading cluster
2219
@OpenForTesting
2320
class SkipExecution(
24-
private val client: Client,
25-
private val clusterService: ClusterService
26-
) : ClusterStateListener {
21+
private val client: Client
22+
) {
2723
private val logger = LogManager.getLogger(javaClass)
2824

29-
@Volatile final var flag: Boolean = false
25+
@Volatile
26+
final var flag: Boolean = false
3027
private set
28+
3129
// To track if there are any legacy IM plugin nodes part of the cluster
32-
@Volatile final var hasLegacyPlugin: Boolean = false
30+
@Volatile
31+
final var hasLegacyPlugin: Boolean = false
3332
private set
3433

35-
init {
36-
clusterService.addListener(this)
37-
}
38-
39-
override fun clusterChanged(event: ClusterChangedEvent) {
40-
if (event.nodesChanged() || event.isNewCluster) {
41-
sweepISMPluginVersion()
42-
}
43-
}
44-
4534
fun sweepISMPluginVersion() {
4635
// if old version ISM plugin exists (2 versions ISM in one cluster), set skip flag to true
4736
val request = NodesInfoRequest().clear().addMetric("plugins")

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,13 @@ class ManagedIndexSettings {
101101
Setting.Property.Dynamic
102102
)
103103

104+
val SWEEP_SKIP_PERIOD: Setting<TimeValue> = Setting.timeSetting(
105+
"plugins.index_state_management.coordinator.sweep_skip_period",
106+
TimeValue.timeValueMinutes(5),
107+
Setting.Property.NodeScope,
108+
Setting.Property.Dynamic
109+
)
110+
104111
val COORDINATOR_BACKOFF_MILLIS: Setting<TimeValue> = Setting.positiveTimeSetting(
105112
"plugins.index_state_management.coordinator.backoff_millis",
106113
LegacyOpenDistroManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS,

src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/MetadataServiceTests.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class MetadataServiceTests : OpenSearchTestCase() {
5454
)
5555
)
5656
)
57-
val skipFlag = SkipExecution(client, clusterService)
57+
val skipFlag = SkipExecution(client)
5858
val metadataService = MetadataService(client, clusterService, skipFlag, imIndices)
5959
metadataService.moveMetadata()
6060

@@ -75,7 +75,7 @@ class MetadataServiceTests : OpenSearchTestCase() {
7575
)
7676
)
7777

78-
val skipFlag = SkipExecution(client, clusterService)
78+
val skipFlag = SkipExecution(client)
7979
val metadataService = MetadataService(client, clusterService, skipFlag, imIndices)
8080
metadataService.moveMetadata()
8181
assertEquals(metadataService.runTimeCounter, 2)

src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/SkipExecutionTests.kt

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,27 +11,24 @@ import org.opensearch.action.admin.cluster.node.info.NodesInfoAction
1111
import org.opensearch.client.Client
1212
import org.opensearch.cluster.ClusterChangedEvent
1313
import org.opensearch.cluster.OpenSearchAllocationTestCase
14-
import org.opensearch.cluster.service.ClusterService
1514
import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution
1615

1716
class SkipExecutionTests : OpenSearchAllocationTestCase() {
1817

1918
private lateinit var client: Client
20-
private lateinit var clusterService: ClusterService
2119
private lateinit var skip: SkipExecution
2220

2321
@Before
2422
@Throws(Exception::class)
2523
fun setup() {
2624
client = Mockito.mock(Client::class.java)
27-
clusterService = Mockito.mock(ClusterService::class.java)
28-
skip = SkipExecution(client, clusterService)
25+
skip = SkipExecution(client)
2926
}
3027

3128
fun `test cluster change event`() {
3229
val event = Mockito.mock(ClusterChangedEvent::class.java)
3330
Mockito.`when`(event.nodesChanged()).thenReturn(true)
34-
skip.clusterChanged(event)
31+
skip.sweepISMPluginVersion()
3532
Mockito.verify(client).execute(Mockito.eq(NodesInfoAction.INSTANCE), Mockito.any(), Mockito.any())
3633
}
3734
}

0 commit comments

Comments
 (0)