Skip to content

Commit b1b0082

Browse files
skipping execution based on cluster service (#1219)
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com> (cherry picked from commit d6da55c)
1 parent abdf77b commit b1b0082

File tree

4 files changed

+103
-76
lines changed

4 files changed

+103
-76
lines changed

src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
404404
fieldCapsFilter = FieldCapsFilter(clusterService, settings, indexNameExpressionResolver)
405405
this.indexNameExpressionResolver = indexNameExpressionResolver
406406

407-
val skipFlag = SkipExecution(client)
407+
val skipFlag = SkipExecution()
408408
RollupFieldValueExpressionResolver.registerServices(scriptService, clusterService)
409409
val rollupRunner = RollupRunner
410410
.registerClient(client)

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/PluginVersionSweepCoordinator.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class PluginVersionSweepCoordinator(
2525
private val skipExecution: SkipExecution,
2626
settings: Settings,
2727
private val threadPool: ThreadPool,
28-
clusterService: ClusterService,
28+
var clusterService: ClusterService,
2929
) : CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ISMPluginSweepCoordinator")),
3030
LifecycleListener(),
3131
ClusterStateListener {
@@ -58,7 +58,7 @@ class PluginVersionSweepCoordinator(
5858

5959
override fun clusterChanged(event: ClusterChangedEvent) {
6060
if (event.nodesChanged() || event.isNewCluster) {
61-
skipExecution.sweepISMPluginVersion()
61+
skipExecution.sweepISMPluginVersion(clusterService)
6262
initBackgroundSweepISMPluginVersionExecution()
6363
}
6464
}
@@ -76,7 +76,7 @@ class PluginVersionSweepCoordinator(
7676
logger.info("Canceling sweep ism plugin version job")
7777
scheduledSkipExecution?.cancel()
7878
} else {
79-
skipExecution.sweepISMPluginVersion()
79+
skipExecution.sweepISMPluginVersion(clusterService)
8080
}
8181
} catch (e: Exception) {
8282
logger.error("Failed to sweep ism plugin version", e)

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/SkipExecution.kt

Lines changed: 25 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,14 @@
66
package org.opensearch.indexmanagement.indexstatemanagement
77

88
import org.apache.logging.log4j.LogManager
9-
import org.opensearch.action.admin.cluster.node.info.NodesInfoAction
10-
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest
11-
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse
12-
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules
13-
import org.opensearch.client.Client
14-
import org.opensearch.core.action.ActionListener
9+
import org.opensearch.Version
10+
import org.opensearch.cluster.service.ClusterService
1511
import org.opensearch.indexmanagement.util.OpenForTesting
1612

1713
// TODO this can be moved to job scheduler, so that all extended plugin
1814
// can avoid running jobs in an upgrading cluster
1915
@OpenForTesting
20-
class SkipExecution(
21-
private val client: Client,
22-
) {
16+
class SkipExecution {
2317
private val logger = LogManager.getLogger(javaClass)
2418

2519
@Volatile
@@ -31,53 +25,27 @@ class SkipExecution(
3125
final var hasLegacyPlugin: Boolean = false
3226
private set
3327

34-
fun sweepISMPluginVersion() {
35-
// if old version ISM plugin exists (2 versions ISM in one cluster), set skip flag to true
36-
val request = NodesInfoRequest().clear().addMetric("plugins")
37-
client.execute(
38-
NodesInfoAction.INSTANCE, request,
39-
object : ActionListener<NodesInfoResponse> {
40-
override fun onResponse(response: NodesInfoResponse) {
41-
val versionSet = mutableSetOf<String>()
42-
val legacyVersionSet = mutableSetOf<String>()
43-
44-
response.nodes.map { it.getInfo(PluginsAndModules::class.java).pluginInfos }
45-
.forEach {
46-
it.forEach { nodePlugin ->
47-
if (nodePlugin.name == "opensearch-index-management" ||
48-
nodePlugin.name == "opensearch_index_management"
49-
) {
50-
versionSet.add(nodePlugin.version)
51-
}
52-
53-
if (nodePlugin.name == "opendistro-index-management" ||
54-
nodePlugin.name == "opendistro_index_management"
55-
) {
56-
legacyVersionSet.add(nodePlugin.version)
57-
}
58-
}
59-
}
60-
61-
if ((versionSet.size + legacyVersionSet.size) > 1) {
62-
flag = true
63-
logger.info("There are multiple versions of Index Management plugins in the cluster: [$versionSet, $legacyVersionSet]")
64-
} else {
65-
flag = false
66-
}
67-
68-
if (versionSet.isNotEmpty() && legacyVersionSet.isNotEmpty()) {
69-
hasLegacyPlugin = true
70-
logger.info("Found legacy plugin versions [$legacyVersionSet] and opensearch plugins versions [$versionSet] in the cluster")
71-
} else {
72-
hasLegacyPlugin = false
73-
}
74-
}
75-
76-
override fun onFailure(e: Exception) {
77-
logger.error("Failed sweeping nodes for ISM plugin versions: $e")
78-
flag = false
79-
}
80-
},
81-
)
28+
fun sweepISMPluginVersion(clusterService: ClusterService) {
29+
try {
30+
// if old version ISM plugin exists (2 versions ISM in one cluster), set skip flag to true
31+
val currentMinVersion = clusterService.state().nodes.minNodeVersion
32+
val currentMaxVersion = clusterService.state().nodes.maxNodeVersion
33+
34+
if (currentMinVersion != null && !currentMinVersion.equals(currentMaxVersion)) {
35+
flag = true
36+
logger.info("There are multiple versions of Index Management plugins in the cluster: [$currentMaxVersion, $currentMinVersion]")
37+
} else {
38+
flag = false
39+
}
40+
41+
if (currentMinVersion.major > Version.CURRENT.major && currentMinVersion != currentMaxVersion) {
42+
hasLegacyPlugin = true
43+
logger.info("Found legacy plugin versions [$currentMinVersion] and opensearch plugins versions [$currentMaxVersion] in the cluster")
44+
} else {
45+
hasLegacyPlugin = false
46+
}
47+
} catch (e: Exception) {
48+
logger.error("Unable to fetch node versions from cluster service", e)
49+
}
8250
}
8351
}

src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/SkipExecutionTests.kt

Lines changed: 74 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,30 +5,89 @@
55

66
package org.opensearch.indexmanagement.indexstatemanagement.coordinator
77

8+
import com.nhaarman.mockitokotlin2.mock
9+
import com.nhaarman.mockitokotlin2.whenever
810
import org.junit.Before
9-
import org.mockito.Mockito
10-
import org.opensearch.action.admin.cluster.node.info.NodesInfoAction
11-
import org.opensearch.client.Client
12-
import org.opensearch.cluster.ClusterChangedEvent
13-
import org.opensearch.cluster.OpenSearchAllocationTestCase
11+
import org.opensearch.Version
12+
import org.opensearch.cluster.ClusterState
13+
import org.opensearch.cluster.node.DiscoveryNode
14+
import org.opensearch.cluster.node.DiscoveryNodes
15+
import org.opensearch.cluster.service.ClusterService
16+
import org.opensearch.core.common.transport.TransportAddress
1417
import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution
18+
import org.opensearch.test.OpenSearchTestCase
1519

16-
class SkipExecutionTests : OpenSearchAllocationTestCase() {
20+
class SkipExecutionTests : OpenSearchTestCase() {
21+
private var clusterService: ClusterService = mock()
1722

18-
private lateinit var client: Client
23+
private lateinit var clusterState: ClusterState
1924
private lateinit var skip: SkipExecution
2025

2126
@Before
22-
@Throws(Exception::class)
2327
fun setup() {
24-
client = Mockito.mock(Client::class.java)
25-
skip = SkipExecution(client)
28+
skip = SkipExecution()
2629
}
2730

28-
fun `test cluster change event`() {
29-
val event = Mockito.mock(ClusterChangedEvent::class.java)
30-
Mockito.`when`(event.nodesChanged()).thenReturn(true)
31-
skip.sweepISMPluginVersion()
32-
Mockito.verify(client).execute(Mockito.eq(NodesInfoAction.INSTANCE), Mockito.any(), Mockito.any())
31+
fun `test sweepISMPluginVersion should set flag to false and hasLegacyPlugin to false when all nodes have the same version`() {
32+
val version = Version.CURRENT
33+
val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), version)
34+
val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), version)
35+
val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).build()
36+
clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build()
37+
whenever(clusterService.state()).thenReturn(clusterState)
38+
39+
skip.sweepISMPluginVersion(clusterService)
40+
41+
assertFalse(skip.flag)
42+
assertFalse(skip.hasLegacyPlugin)
43+
}
44+
45+
fun `test sweepISMPluginVersion should set flag to true and hasLegacyPlugin to false when all nodes have the different versions`() {
46+
val version1 = Version.CURRENT
47+
val version2 = Version.V_2_0_0
48+
val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), version1)
49+
val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), version2)
50+
val node3 = DiscoveryNode("node3", TransportAddress(TransportAddress.META_ADDRESS, 9302), version2)
51+
val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build()
52+
clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build()
53+
whenever(clusterService.state()).thenReturn(clusterState)
54+
55+
skip.sweepISMPluginVersion(clusterService)
56+
57+
assertTrue(skip.flag)
58+
assertFalse(skip.hasLegacyPlugin)
59+
}
60+
61+
fun `test sweepISMPluginVersion should set flag to true and hasLegacyPlugin to true when there are different versions including current version`() {
62+
val minVersion = Version.fromString("7.10.0")
63+
val maxVersion = Version.CURRENT
64+
val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), minVersion)
65+
val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), maxVersion)
66+
val node3 = DiscoveryNode("node3", TransportAddress(TransportAddress.META_ADDRESS, 9302), maxVersion)
67+
val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build()
68+
clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build()
69+
whenever(clusterService.state()).thenReturn(clusterState)
70+
71+
skip.sweepISMPluginVersion(clusterService)
72+
73+
assertTrue(skip.flag)
74+
assertTrue(skip.hasLegacyPlugin)
75+
}
76+
77+
fun `test sweepISMPluginVersion should set flag to true and hasLegacyPlugin to true with different versions`() {
78+
val minVersion = Version.fromString("7.10.0")
79+
val maxVersion = Version.V_2_0_0
80+
val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), minVersion)
81+
val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), maxVersion)
82+
val node3 = DiscoveryNode("node3", TransportAddress(TransportAddress.META_ADDRESS, 9302), maxVersion)
83+
84+
val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build()
85+
clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build()
86+
whenever(clusterService.state()).thenReturn(clusterState)
87+
88+
skip.sweepISMPluginVersion(clusterService)
89+
90+
assertTrue(skip.flag)
91+
assertTrue(skip.hasLegacyPlugin)
3392
}
3493
}

0 commit comments

Comments
 (0)