Skip to content

Commit

Permalink
Removing Usages of Action Get Call and using listeners (opensearch-pr…
Browse files Browse the repository at this point in the history
…oject#100)

Signed-off-by: Aditya Jindal <aditjind@amazon.com>
  • Loading branch information
adityaj1107 authored Jul 22, 2021
1 parent d83d1e8 commit c9787cd
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 20 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[![Test and Build Workflow](https://github.com/opensearch-project/index-management/workflows/Test%20and%20Build%20Workflow/badge.svg)](https://github.com/opensearch-project/index-management/actions)
[![codecov](https://codecov.io/gh/opensearch-project/index-management/branch/main/graph/badge.svg)](https://codecov.io/gh/opensearch-project/index-management)
[![Documentation](https://img.shields.io/badge/api-reference-blue.svg)](https://docs-beta.opensearch.org/im-plugin/index/)
[![Documentation](https://img.shields.io/badge/api-reference-blue.svg)](https://opensearch.org/docs/im-plugin/index/)
[![Chat](https://img.shields.io/badge/chat-on%20forums-blue)](https://discuss.opendistrocommunity.dev/c/index-management/)
![PRs welcome!](https://img.shields.io/badge/PRs-welcome!-success)

Expand Down Expand Up @@ -58,7 +58,7 @@ See [developer guide](DEVELOPER_GUIDE.md) and [how to contribute to this project

If you find a bug, or have a feature request, please don't hesitate to open an issue in this repository.

For more information, see [project website](https://opensearch.org/) and [documentation](https://docs-beta.opensearch.org/). If you need help and are unsure where to open an issue, try [forums](https://discuss.opendistrocommunity.dev/).
For more information, see [project website](https://opensearch.org/) and [documentation](https://opensearch.org/docs/). If you need help and are unsure where to open an issue, try [forums](https://discuss.opendistrocommunity.dev/).

## Code of Conduct

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,22 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionListener
import org.opensearch.action.DocWriteRequest
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.cluster.state.ClusterStateResponse
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.admin.indices.rollover.RolloverRequest
import org.opensearch.action.admin.indices.rollover.RolloverResponse
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.Client
import org.opensearch.cluster.LocalNodeMasterListener
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
import org.opensearch.index.IndexNotFoundException
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
Expand All @@ -61,6 +62,7 @@ import org.opensearch.threadpool.ThreadPool
import java.time.Instant

@OpenForTesting
@Suppress("TooManyFunctions")
class IndexStateManagementHistory(
settings: Settings,
private val client: Client,
Expand Down Expand Up @@ -176,7 +178,6 @@ class IndexStateManagementHistory(

@Suppress("SpreadOperator", "NestedBlockDepth", "ComplexMethod")
private fun deleteOldHistoryIndex() {
val indexToDelete = mutableListOf<String>()

val clusterStateRequest = ClusterStateRequest()
.clear()
Expand All @@ -185,8 +186,28 @@ class IndexStateManagementHistory(
.local(true)
.indicesOptions(IndicesOptions.strictExpand())

val clusterStateResponse = client.admin().cluster().state(clusterStateRequest).actionGet()
client.admin().cluster().state(
clusterStateRequest,
object : ActionListener<ClusterStateResponse> {
override fun onResponse(clusterStateResponse: ClusterStateResponse) {
if (!clusterStateResponse.state.metadata.indices.isEmpty) {
val indicesToDelete = getIndicesToDelete(clusterStateResponse)
logger.info("Deleting old history indices viz $indicesToDelete")
deleteAllOldHistoryIndices(indicesToDelete)
} else {
logger.info("No Old History Indices to delete")
}
}

override fun onFailure(exception: Exception) {
logger.error("Error fetching cluster state ${exception.message}")
}
}
)
}

private fun getIndicesToDelete(clusterStateResponse: ClusterStateResponse): List<String> {
var indicesToDelete = mutableListOf<String>()
for (entry in clusterStateResponse.state.metadata.indices()) {
val indexMetaData = entry.value
val creationTime = indexMetaData.creationDate
Expand All @@ -198,27 +219,51 @@ class IndexStateManagementHistory(
continue
}

indexToDelete.add(indexMetaData.index.name)
indicesToDelete.add(indexMetaData.index.name)
}
}
return indicesToDelete
}

@Suppress("SpreadOperator")
private fun deleteAllOldHistoryIndices(indicesToDelete: List<String>) {
if (indicesToDelete.isNotEmpty()) {
val deleteRequest = DeleteIndexRequest(*indicesToDelete.toTypedArray())
client.admin().indices().delete(
deleteRequest,
object : ActionListener<AcknowledgedResponse> {
override fun onResponse(deleteIndicesResponse: AcknowledgedResponse) {
if (!deleteIndicesResponse.isAcknowledged) {
logger.error("could not delete one or more ISM history index. $indicesToDelete. Retrying one by one.")
deleteOldHistoryIndex(indicesToDelete)
}
}
override fun onFailure(exception: Exception) {
logger.error("Error deleting old history indices ${exception.message}")
deleteOldHistoryIndex(indicesToDelete)
}
}
)
}
}

if (indexToDelete.isNotEmpty()) {
val deleteRequest = DeleteIndexRequest(*indexToDelete.toTypedArray())
val deleteResponse = client.admin().indices().delete(deleteRequest).actionGet()
if (!deleteResponse.isAcknowledged) {
logger.error("could not delete one or more ISM history index. $indexToDelete. Retrying one by one.")
for (index in indexToDelete) {
try {
val singleDeleteRequest = DeleteIndexRequest(*indexToDelete.toTypedArray())
val singleDeleteResponse = client.admin().indices().delete(singleDeleteRequest).actionGet()
@Suppress("SpreadOperator")
private fun deleteOldHistoryIndex(indicesToDelete: List<String>) {
for (index in indicesToDelete) {
val singleDeleteRequest = DeleteIndexRequest(*indicesToDelete.toTypedArray())
client.admin().indices().delete(
singleDeleteRequest,
object : ActionListener<AcknowledgedResponse> {
override fun onResponse(singleDeleteResponse: AcknowledgedResponse) {
if (!singleDeleteResponse.isAcknowledged) {
logger.error("could not delete one or more ISM history index. $index.")
}
} catch (e: IndexNotFoundException) {
logger.debug("$index was already deleted. ${e.message}")
}
override fun onFailure(exception: Exception) {
logger.debug("Exception ${exception.message} while deleting the index $index")
}
}
}
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class AttemptRolloverStep(

override fun isIdempotent() = false

@Suppress("TooGenericExceptionCaught")
@Suppress("ComplexMethod", "LongMethod", "TooGenericExceptionCaught")
override suspend fun execute(): AttemptRolloverStep {
val skipRollover = clusterService.state().metadata.index(indexName).getRolloverSkip()
if (skipRollover) {
Expand Down Expand Up @@ -278,6 +278,7 @@ class AttemptRolloverStep(
)
}

@Suppress("TooManyFunctions")
companion object {
fun getFailedMessage(index: String) = "Failed to rollover index [index=$index]"
fun getFailedAliasUpdateMessage(index: String, newIndex: String) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class TransportGetTransformAction @Inject constructor(
GetTransformAction.NAME, transportService, actionFilters, ::GetTransformRequest
) {

@Suppress("ReturnCount")
override fun doExecute(task: Task, request: GetTransformRequest, listener: ActionListener<GetTransformResponse>) {
val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, request.id)
.fetchSourceContext(request.srcContext).preference(request.preference)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class TransportIndexTransformAction @Inject constructor(
return modified.toList()
}

@Suppress("SpreadOperator")
private fun putTransform() {
val transform = request.transform.copy(schemaVersion = IndexUtils.indexManagementConfigSchemaVersion)
request.index(INDEX_MANAGEMENT_INDEX)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class TransportPreviewTransformAction @Inject constructor(
PreviewTransformAction.NAME, transportService, actionFilters, ::PreviewTransformRequest
) {

@Suppress("SpreadOperator")
override fun doExecute(task: Task, request: PreviewTransformRequest, listener: ActionListener<PreviewTransformResponse>) {
val transform = request.transform

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ data class Transform(
const val TRANSFORM_DOC_ID_FIELD = "$TRANSFORM_TYPE._id"
const val TRANSFORM_DOC_COUNT_FIELD = "$TRANSFORM_TYPE._doc_count"

@Suppress("LongMethod")
@Suppress("ComplexMethod", "LongMethod")
@JvmStatic
@JvmOverloads
fun parse(
Expand Down

0 comments on commit c9787cd

Please sign in to comment.