Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[![Test and Build Workflow](https://github.com/opensearch-project/index-management/workflows/Test%20and%20Build%20Workflow/badge.svg)](https://github.com/opensearch-project/index-management/actions)
[![codecov](https://codecov.io/gh/opensearch-project/index-management/branch/main/graph/badge.svg)](https://codecov.io/gh/opensearch-project/index-management)
[![Documentation](https://img.shields.io/badge/api-reference-blue.svg)](https://docs-beta.opensearch.org/im-plugin/index/)
[![Documentation](https://img.shields.io/badge/api-reference-blue.svg)](https://opensearch.org/docs/im-plugin/index/)
[![Chat](https://img.shields.io/badge/chat-on%20forums-blue)](https://discuss.opendistrocommunity.dev/c/index-management/)
![PRs welcome!](https://img.shields.io/badge/PRs-welcome!-success)

Expand Down Expand Up @@ -58,7 +58,7 @@ See [developer guide](DEVELOPER_GUIDE.md) and [how to contribute to this project

If you find a bug, or have a feature request, please don't hesitate to open an issue in this repository.

For more information, see [project website](https://opensearch.org/) and [documentation](https://docs-beta.opensearch.org/). If you need help and are unsure where to open an issue, try [forums](https://discuss.opendistrocommunity.dev/).
For more information, see [project website](https://opensearch.org/) and [documentation](https://opensearch.org/docs/). If you need help and are unsure where to open an issue, try [forums](https://discuss.opendistrocommunity.dev/).

## Code of Conduct

Expand Down
77 changes: 77 additions & 0 deletions integtest.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#!/bin/bash

set -e

function usage() {
echo ""
echo "This script is used to run integration tests for plugin installed on a remote OpenSearch/Dashboards cluster."
echo "--------------------------------------------------------------------------"
echo "Usage: $0 [args]"
echo ""
echo "Required arguments:"
echo "None"
echo ""
echo "Optional arguments:"
echo -e "-b BIND_ADDRESS\t, defaults to localhost | 127.0.0.1, can be changed to any IP or domain name for the cluster location."
echo -e "-p BIND_PORT\t, defaults to 9200 or 5601 depends on OpenSearch or Dashboards, can be changed to any port for the cluster location."
echo -e "-s SECURITY_ENABLED\t(true | false), defaults to true. Specify the OpenSearch/Dashboards have security enabled or not."
echo -e "-c CREDENTIAL\t(usename:password), no defaults, effective when SECURITY_ENABLED=true."
echo -e "-h\tPrint this message."
echo "--------------------------------------------------------------------------"
}

while getopts ":hb:p:s:c:" arg; do
case $arg in
h)
usage
exit 1
;;
b)
BIND_ADDRESS=$OPTARG
;;
p)
BIND_PORT=$OPTARG
;;
s)
SECURITY_ENABLED=$OPTARG
;;
c)
CREDENTIAL=$OPTARG
;;
:)
echo "-${OPTARG} requires an argument"
usage
exit 1
;;
?)
echo "Invalid option: -${OPTARG}"
exit 1
;;
esac
done


if [ -z "$BIND_ADDRESS" ]
then
BIND_ADDRESS="localhost"
fi

if [ -z "$BIND_PORT" ]
then
BIND_PORT="9200"
fi

if [ -z "$SECURITY_ENABLED" ]
then
SECURITY_ENABLED="true"
fi

if [ -z "$CREDENTIAL" ]
then
CREDENTIAL="admin:admin"
USERNAME=`echo $CREDENTIAL | awk -F ':' '{print $1}'`
PASSWORD=`echo $CREDENTIAL | awk -F ':' '{print $2}'`
fi

./gradlew integTest -Dtests.rest.cluster="$BIND_ADDRESS:$BIND_PORT" -Dtests.cluster="$BIND_ADDRESS:$BIND_PORT" -Dtests.clustername="opensearch-integrationtest" -Dhttps=$SECURITY_ENABLED -Duser=$USERNAME -Dpassword=$PASSWORD --console=plain

Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ManagedIndexSettings.ROLLOVER_SKIP,
ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED,
ManagedIndexSettings.METADATA_SERVICE_ENABLED,
ManagedIndexSettings.AUTO_MANAGE,
ManagedIndexSettings.JOB_INTERVAL,
ManagedIndexSettings.SWEEP_PERIOD,
ManagedIndexSettings.COORDINATOR_BACKOFF_COUNT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ package org.opensearch.indexmanagement.indexstatemanagement
import org.apache.logging.log4j.LogManager
import org.apache.lucene.util.automaton.Operations
import org.opensearch.OpenSearchException
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.IndexAbstraction
import org.opensearch.common.Strings
import org.opensearch.common.ValidationException
import org.opensearch.common.regex.Regex
Expand All @@ -40,26 +39,24 @@ import org.opensearch.indexmanagement.util.IndexManagementException
private val log = LogManager.getLogger("ISMTemplateService")

/**
* find the matching policy based on ISM template field for the given index
* find the matching policy for the given index
*
* filter out hidden index
* filter out older index than template lastUpdateTime
* return early if it's hidden index
* filter out templates that were last updated after the index creation time
*
* @param ismTemplates current ISM templates saved in metadata
* @param indexMetadata cluster state index metadata
* @return policyID
*/
@Suppress("ReturnCount")
fun Map<String, ISMTemplate>.findMatchingPolicy(clusterState: ClusterState, indexName: String): String? {
if (this.isEmpty()) return null

val indexMetadata = clusterState.metadata.index(indexName)
val indexAbstraction = clusterState.metadata.indicesLookup[indexName]
@Suppress("ReturnCount", "NestedBlockDepth")
fun Map<String, List<ISMTemplate>>.findMatchingPolicy(
indexName: String,
indexCreationDate: Long,
isHiddenIndex: Boolean,
indexAbstraction: IndexAbstraction?
): String? {
val isDataStreamIndex = indexAbstraction?.parentDataStream != null

// Don't include hidden index unless it belongs to a data stream.
val isHidden = IndexMetadata.INDEX_HIDDEN_SETTING.get(indexMetadata.settings)
if (!isDataStreamIndex && isHidden) return null
if (this.isEmpty()) return null
// don't include hidden index
if (!isDataStreamIndex && isHiddenIndex) return null

// If the index belongs to a data stream, then find the matching policy using the data stream name.
val lookupName = when {
Expand All @@ -72,14 +69,19 @@ fun Map<String, ISMTemplate>.findMatchingPolicy(clusterState: ClusterState, inde
val patternMatchPredicate = { pattern: String -> Regex.simpleMatch(pattern, lookupName) }
var matchedPolicy: String? = null
var highestPriority: Int = -1
this.filter { (_, template) ->
template.lastUpdatedTime.toEpochMilli() < indexMetadata.creationDate
}.forEach { (policyID, template) ->
val matched = template.indexPatterns.stream().anyMatch(patternMatchPredicate)
if (matched && highestPriority < template.priority) {
highestPriority = template.priority
matchedPolicy = policyID
}

this.forEach { (policyID, templateList) ->
templateList.filter { it.lastUpdatedTime.toEpochMilli() < indexCreationDate }
.forEach {
if (it.indexPatterns.stream().anyMatch(patternMatchPredicate)) {
if (highestPriority < it.priority) {
highestPriority = it.priority
matchedPolicy = policyID
} else if (highestPriority == it.priority) {
log.warn("Warning: index $lookupName matches [$matchedPolicy, $policyID]")
}
}
}
}

return matchedPolicy
Expand Down Expand Up @@ -120,30 +122,61 @@ fun validateFormat(indexPatterns: List<String>): OpenSearchException? {
return null
}

fun List<ISMTemplate>.findSelfConflictingTemplates(): Pair<List<String>, List<String>>? {
val priorityToTemplates = mutableMapOf<Int, List<ISMTemplate>>()
this.forEach {
val templateList = priorityToTemplates[it.priority]
if (templateList != null) {
priorityToTemplates[it.priority] = templateList.plus(it)
} else {
priorityToTemplates[it.priority] = mutableListOf(it)
}
}
priorityToTemplates.forEach { (_, templateList) ->
// same priority
val indexPatternsList = templateList.map { it.indexPatterns }
if (indexPatternsList.size > 1) {
indexPatternsList.forEachIndexed { ind, indexPatterns ->
val comparePatterns = indexPatternsList.subList(ind + 1, indexPatternsList.size).flatten()
if (overlapping(indexPatterns, comparePatterns)) {
return indexPatterns to comparePatterns
}
}
}
}

return null
}

@Suppress("SpreadOperator")
fun overlapping(p1: List<String>, p2: List<String>): Boolean {
if (p1.isEmpty() || p2.isEmpty()) return false
val a1 = Regex.simpleMatchToAutomaton(*p1.toTypedArray())
val a2 = Regex.simpleMatchToAutomaton(*p2.toTypedArray())
return !Operations.isEmpty(Operations.intersection(a1, a2))
}

/**
* find policy templates whose index patterns overlap with given template
*
* @return map of overlapping template name to its index patterns
*/
@Suppress("SpreadOperator")
fun Map<String, ISMTemplate>.findConflictingPolicyTemplates(
fun Map<String, List<ISMTemplate>>.findConflictingPolicyTemplates(
candidate: String,
indexPatterns: List<String>,
priority: Int
): Map<String, List<String>> {
val automaton1 = Regex.simpleMatchToAutomaton(*indexPatterns.toTypedArray())
val overlappingTemplates = mutableMapOf<String, List<String>>()

// focus on template with same priority
this.filter { it.value.priority == priority }
.forEach { (policyID, template) ->
val automaton2 = Regex.simpleMatchToAutomaton(*template.indexPatterns.toTypedArray())
if (!Operations.isEmpty(Operations.intersection(automaton1, automaton2))) {
log.info("Existing ism_template for $policyID overlaps candidate $candidate")
overlappingTemplates[policyID] = template.indexPatterns
this.forEach { (policyID, templateList) ->
templateList.filter { it.priority == priority }
.map { it.indexPatterns }
.forEach {
if (overlapping(indexPatterns, it)) {
overlappingTemplates[policyID] = it
}
}
}
}
overlappingTemplates.remove(candidate)

return overlappingTemplates
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,22 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionListener
import org.opensearch.action.DocWriteRequest
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.cluster.state.ClusterStateResponse
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.admin.indices.rollover.RolloverRequest
import org.opensearch.action.admin.indices.rollover.RolloverResponse
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.Client
import org.opensearch.cluster.LocalNodeMasterListener
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
import org.opensearch.index.IndexNotFoundException
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
Expand All @@ -61,6 +62,7 @@ import org.opensearch.threadpool.ThreadPool
import java.time.Instant

@OpenForTesting
@Suppress("TooManyFunctions")
class IndexStateManagementHistory(
settings: Settings,
private val client: Client,
Expand Down Expand Up @@ -176,7 +178,6 @@ class IndexStateManagementHistory(

@Suppress("SpreadOperator", "NestedBlockDepth", "ComplexMethod")
private fun deleteOldHistoryIndex() {
val indexToDelete = mutableListOf<String>()

val clusterStateRequest = ClusterStateRequest()
.clear()
Expand All @@ -185,8 +186,28 @@ class IndexStateManagementHistory(
.local(true)
.indicesOptions(IndicesOptions.strictExpand())

val clusterStateResponse = client.admin().cluster().state(clusterStateRequest).actionGet()
client.admin().cluster().state(
clusterStateRequest,
object : ActionListener<ClusterStateResponse> {
override fun onResponse(clusterStateResponse: ClusterStateResponse) {
if (!clusterStateResponse.state.metadata.indices.isEmpty) {
val indicesToDelete = getIndicesToDelete(clusterStateResponse)
logger.info("Deleting old history indices viz $indicesToDelete")
deleteAllOldHistoryIndices(indicesToDelete)
} else {
logger.info("No Old History Indices to delete")
}
}

override fun onFailure(exception: Exception) {
logger.error("Error fetching cluster state ${exception.message}")
}
}
)
}

private fun getIndicesToDelete(clusterStateResponse: ClusterStateResponse): List<String> {
var indicesToDelete = mutableListOf<String>()
for (entry in clusterStateResponse.state.metadata.indices()) {
val indexMetaData = entry.value
val creationTime = indexMetaData.creationDate
Expand All @@ -198,27 +219,51 @@ class IndexStateManagementHistory(
continue
}

indexToDelete.add(indexMetaData.index.name)
indicesToDelete.add(indexMetaData.index.name)
}
}
return indicesToDelete
}

@Suppress("SpreadOperator")
private fun deleteAllOldHistoryIndices(indicesToDelete: List<String>) {
if (indicesToDelete.isNotEmpty()) {
val deleteRequest = DeleteIndexRequest(*indicesToDelete.toTypedArray())
client.admin().indices().delete(
deleteRequest,
object : ActionListener<AcknowledgedResponse> {
override fun onResponse(deleteIndicesResponse: AcknowledgedResponse) {
if (!deleteIndicesResponse.isAcknowledged) {
logger.error("could not delete one or more ISM history index. $indicesToDelete. Retrying one by one.")
deleteOldHistoryIndex(indicesToDelete)
}
}
override fun onFailure(exception: Exception) {
logger.error("Error deleting old history indices ${exception.message}")
deleteOldHistoryIndex(indicesToDelete)
}
}
)
}
}

if (indexToDelete.isNotEmpty()) {
val deleteRequest = DeleteIndexRequest(*indexToDelete.toTypedArray())
val deleteResponse = client.admin().indices().delete(deleteRequest).actionGet()
if (!deleteResponse.isAcknowledged) {
logger.error("could not delete one or more ISM history index. $indexToDelete. Retrying one by one.")
for (index in indexToDelete) {
try {
val singleDeleteRequest = DeleteIndexRequest(*indexToDelete.toTypedArray())
val singleDeleteResponse = client.admin().indices().delete(singleDeleteRequest).actionGet()
@Suppress("SpreadOperator")
private fun deleteOldHistoryIndex(indicesToDelete: List<String>) {
for (index in indicesToDelete) {
val singleDeleteRequest = DeleteIndexRequest(*indicesToDelete.toTypedArray())
client.admin().indices().delete(
singleDeleteRequest,
object : ActionListener<AcknowledgedResponse> {
override fun onResponse(singleDeleteResponse: AcknowledgedResponse) {
if (!singleDeleteResponse.isAcknowledged) {
logger.error("could not delete one or more ISM history index. $index.")
}
} catch (e: IndexNotFoundException) {
logger.debug("$index was already deleted. ${e.message}")
}
override fun onFailure(exception: Exception) {
logger.debug("Exception ${exception.message} while deleting the index $index")
}
}
}
)
}
}

Expand Down
Loading