Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
fieldCapsFilter = FieldCapsFilter(clusterService, settings, indexNameExpressionResolver)
this.indexNameExpressionResolver = indexNameExpressionResolver

val skipFlag = SkipExecution(client)
val skipFlag = SkipExecution()
RollupFieldValueExpressionResolver.registerServices(scriptService, clusterService)
val rollupRunner =
RollupRunner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class PluginVersionSweepCoordinator(
private val skipExecution: SkipExecution,
settings: Settings,
private val threadPool: ThreadPool,
clusterService: ClusterService,
var clusterService: ClusterService,
) : CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ISMPluginSweepCoordinator")),
LifecycleListener(),
ClusterStateListener {
Expand Down Expand Up @@ -58,7 +58,7 @@ class PluginVersionSweepCoordinator(

override fun clusterChanged(event: ClusterChangedEvent) {
if (event.nodesChanged() || event.isNewCluster) {
skipExecution.sweepISMPluginVersion()
skipExecution.sweepISMPluginVersion(clusterService)
initBackgroundSweepISMPluginVersionExecution()
}
}
Expand All @@ -77,7 +77,7 @@ class PluginVersionSweepCoordinator(
logger.info("Canceling sweep ism plugin version job")
scheduledSkipExecution?.cancel()
} else {
skipExecution.sweepISMPluginVersion()
skipExecution.sweepISMPluginVersion(clusterService)
}
} catch (e: Exception) {
logger.error("Failed to sweep ism plugin version", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,14 @@
package org.opensearch.indexmanagement.indexstatemanagement

import org.apache.logging.log4j.LogManager
import org.opensearch.action.admin.cluster.node.info.NodesInfoAction
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules
import org.opensearch.client.Client
import org.opensearch.core.action.ActionListener
import org.opensearch.Version
import org.opensearch.cluster.service.ClusterService
import org.opensearch.indexmanagement.util.OpenForTesting

// TODO this can be moved to job scheduler, so that all extended plugin
// can avoid running jobs in an upgrading cluster
@OpenForTesting
class SkipExecution(
private val client: Client,
) {
class SkipExecution {
private val logger = LogManager.getLogger(javaClass)

@Volatile
Expand All @@ -31,53 +25,27 @@ class SkipExecution(
final var hasLegacyPlugin: Boolean = false
private set

fun sweepISMPluginVersion() {
// if old version ISM plugin exists (2 versions ISM in one cluster), set skip flag to true
val request = NodesInfoRequest().clear().addMetric("plugins")
client.execute(
NodesInfoAction.INSTANCE, request,
object : ActionListener<NodesInfoResponse> {
override fun onResponse(response: NodesInfoResponse) {
val versionSet = mutableSetOf<String>()
val legacyVersionSet = mutableSetOf<String>()

response.nodes.map { it.getInfo(PluginsAndModules::class.java).pluginInfos }
.forEach {
it.forEach { nodePlugin ->
if (nodePlugin.name == "opensearch-index-management" ||
nodePlugin.name == "opensearch_index_management"
) {
versionSet.add(nodePlugin.version)
}

if (nodePlugin.name == "opendistro-index-management" ||
nodePlugin.name == "opendistro_index_management"
) {
legacyVersionSet.add(nodePlugin.version)
}
}
}

if ((versionSet.size + legacyVersionSet.size) > 1) {
flag = true
logger.info("There are multiple versions of Index Management plugins in the cluster: [$versionSet, $legacyVersionSet]")
} else {
flag = false
}

if (versionSet.isNotEmpty() && legacyVersionSet.isNotEmpty()) {
hasLegacyPlugin = true
logger.info("Found legacy plugin versions [$legacyVersionSet] and opensearch plugins versions [$versionSet] in the cluster")
} else {
hasLegacyPlugin = false
}
}

override fun onFailure(e: Exception) {
logger.error("Failed sweeping nodes for ISM plugin versions: $e")
flag = false
}
},
)
fun sweepISMPluginVersion(clusterService: ClusterService) {
try {
// if old version ISM plugin exists (2 versions ISM in one cluster), set skip flag to true
val currentMinVersion = clusterService.state().nodes.minNodeVersion
val currentMaxVersion = clusterService.state().nodes.maxNodeVersion

if (currentMinVersion != null && !currentMinVersion.equals(currentMaxVersion)) {
flag = true
logger.info("There are multiple versions of Index Management plugins in the cluster: [$currentMaxVersion, $currentMinVersion]")
} else {
flag = false
}

if (currentMinVersion.major > Version.CURRENT.major && currentMinVersion != currentMaxVersion) {
hasLegacyPlugin = true
logger.info("Found legacy plugin versions [$currentMinVersion] and opensearch plugins versions [$currentMaxVersion] in the cluster")
} else {
hasLegacyPlugin = false
}
} catch (e: Exception) {
logger.error("Unable to fetch node versions from cluster service", e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,88 @@

package org.opensearch.indexmanagement.indexstatemanagement.coordinator

import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.whenever
import org.junit.Before
import org.mockito.Mockito
import org.opensearch.action.admin.cluster.node.info.NodesInfoAction
import org.opensearch.client.Client
import org.opensearch.cluster.ClusterChangedEvent
import org.opensearch.cluster.OpenSearchAllocationTestCase
import org.opensearch.Version
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.node.DiscoveryNode
import org.opensearch.cluster.node.DiscoveryNodes
import org.opensearch.cluster.service.ClusterService
import org.opensearch.core.common.transport.TransportAddress
import org.opensearch.indexmanagement.indexstatemanagement.SkipExecution
import org.opensearch.test.OpenSearchTestCase

class SkipExecutionTests : OpenSearchAllocationTestCase() {
private lateinit var client: Client
class SkipExecutionTests : OpenSearchTestCase() {
private var clusterService: ClusterService = mock()
private lateinit var clusterState: ClusterState
private lateinit var skip: SkipExecution

@Before
@Throws(Exception::class)
fun setup() {
client = Mockito.mock(Client::class.java)
skip = SkipExecution(client)
skip = SkipExecution()
}

fun `test cluster change event`() {
val event = Mockito.mock(ClusterChangedEvent::class.java)
Mockito.`when`(event.nodesChanged()).thenReturn(true)
skip.sweepISMPluginVersion()
Mockito.verify(client).execute(Mockito.eq(NodesInfoAction.INSTANCE), Mockito.any(), Mockito.any())
fun `test sweepISMPluginVersion should set flag to false and hasLegacyPlugin to false when all nodes have the same version`() {
val version = Version.CURRENT
val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), version)
val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), version)
val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).build()
clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build()
whenever(clusterService.state()).thenReturn(clusterState)

skip.sweepISMPluginVersion(clusterService)

assertFalse(skip.flag)
assertFalse(skip.hasLegacyPlugin)
}

fun `test sweepISMPluginVersion should set flag to true and hasLegacyPlugin to false when all nodes have the different versions`() {
val version1 = Version.CURRENT
val version2 = Version.V_2_0_0
val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), version1)
val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), version2)
val node3 = DiscoveryNode("node3", TransportAddress(TransportAddress.META_ADDRESS, 9302), version2)
val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build()
clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build()
whenever(clusterService.state()).thenReturn(clusterState)

skip.sweepISMPluginVersion(clusterService)

assertTrue(skip.flag)
assertFalse(skip.hasLegacyPlugin)
}

fun `test sweepISMPluginVersion should set flag to true and hasLegacyPlugin to true when there are different versions including current version`() {
val minVersion = Version.fromString("7.10.0")
val maxVersion = Version.CURRENT
val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), minVersion)
val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), maxVersion)
val node3 = DiscoveryNode("node3", TransportAddress(TransportAddress.META_ADDRESS, 9302), maxVersion)
val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build()
clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build()
whenever(clusterService.state()).thenReturn(clusterState)

skip.sweepISMPluginVersion(clusterService)

assertTrue(skip.flag)
assertTrue(skip.hasLegacyPlugin)
}

fun `test sweepISMPluginVersion should set flag to true and hasLegacyPlugin to true with different versions`() {
val minVersion = Version.fromString("7.10.0")
val maxVersion = Version.V_2_0_0
val node1 = DiscoveryNode("node1", TransportAddress(TransportAddress.META_ADDRESS, 9300), minVersion)
val node2 = DiscoveryNode("node2", TransportAddress(TransportAddress.META_ADDRESS, 9301), maxVersion)
val node3 = DiscoveryNode("node3", TransportAddress(TransportAddress.META_ADDRESS, 9302), maxVersion)

val discoveryNodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build()
clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes).build()
whenever(clusterService.state()).thenReturn(clusterState)

skip.sweepISMPluginVersion(clusterService)

assertTrue(skip.flag)
assertTrue(skip.hasLegacyPlugin)
}
}