Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.block.ClusterBlockException
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
Expand All @@ -64,7 +65,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.FailedIndex
import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexConfigIndexRequest
import org.opensearch.indexmanagement.opensearchapi.parseFromGetResponse
import org.opensearch.indexmanagement.settings.IndexManagementSettings
import org.opensearch.indexmanagement.util.IndexManagementException
import org.opensearch.indexmanagement.util.IndexUtils
import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser
import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermissionForResource
Expand All @@ -73,19 +73,21 @@ import org.opensearch.rest.RestStatus
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.lang.Exception
import java.lang.IllegalArgumentException
import java.time.Duration
import java.time.Instant

private val log = LogManager.getLogger(TransportAddPolicyAction::class.java)

@Suppress("SpreadOperator")
@Suppress("SpreadOperator", "ReturnCount")
class TransportAddPolicyAction @Inject constructor(
val client: NodeClient,
transportService: TransportService,
actionFilters: ActionFilters,
val settings: Settings,
val clusterService: ClusterService,
val xContentRegistry: NamedXContentRegistry
val xContentRegistry: NamedXContentRegistry,
val indexNameExpressionResolver: IndexNameExpressionResolver
) : HandledTransportAction<AddPolicyRequest, ISMStatusResponse>(
AddPolicyAction.NAME, transportService, actionFilters, ::AddPolicyRequest
) {
Expand Down Expand Up @@ -114,47 +116,80 @@ class TransportAddPolicyAction @Inject constructor(
) {
private lateinit var startTime: Instant
private lateinit var policy: Policy
private val resolvedIndices = mutableListOf<String>()
private val indicesToAdd = mutableMapOf<String, String>() // uuid: name
private val failedIndices: MutableList<FailedIndex> = mutableListOf()

fun start() {
if (!validateUserConfiguration(user, filterByEnabled, actionListener)) {
return
}
val requestedIndices = mutableListOf<String>()
request.indices.forEach { index ->
requestedIndices.addAll(
indexNameExpressionResolver.concreteIndexNames(
clusterService.state(),
IndicesOptions.lenientExpand(),
true,
index
)
)
}
if (requestedIndices.isEmpty()) {
// Nothing to do will ignore since found no matching indices
actionListener.onResponse(ISMStatusResponse(0, failedIndices))
return
}
if (user == null) {
resolvedIndices.addAll(requestedIndices)
getPolicy()
} else {
validateAndGetPolicy()
validateAndGetPolicy(0, requestedIndices)
}
}

private fun validateAndGetPolicy() {
val request = ManagedIndexRequest().indices(*request.indices.toTypedArray())
/**
* We filter the requested indices to the indices user has permission to manage and apply policies only on top of those
*/
private fun validateAndGetPolicy(current: Int, indices: List<String>) {
val request = ManagedIndexRequest().indices(indices[current])
client.execute(
ManagedIndexAction.INSTANCE,
request,
object : ActionListener<AcknowledgedResponse> {
override fun onResponse(response: AcknowledgedResponse) {
getPolicy()
resolvedIndices.add(indices[current])
proceed(current, indices)
}

override fun onFailure(e: Exception) {
actionListener.onFailure(
IndexManagementException.wrap(
when (e is OpenSearchSecurityException) {
true -> OpenSearchStatusException(
"User doesn't have required index permissions on one or more requested indices: ${e.localizedMessage}",
RestStatus.FORBIDDEN
)
false -> e
}
)
)
when (e is OpenSearchSecurityException) {
true -> {
proceed(current, indices)
}
false -> {
// failing the request for any other exception
actionListener.onFailure(e)
}
}
}
}
)
}

private fun proceed(current: Int, indices: List<String>) {
if (current < indices.count() - 1) {
validateAndGetPolicy(current + 1, indices)
} else {
// sanity check that there are indices - if none then return
if (resolvedIndices.isEmpty()) {
actionListener.onResponse(ISMStatusResponse(0, failedIndices))
return
}
getPolicy()
}
}

private fun getPolicy() {
val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, request.policyID)

Expand All @@ -171,7 +206,12 @@ class TransportAddPolicyAction @Inject constructor(
actionListener.onFailure(OpenSearchStatusException("Could not find policy=${request.policyID}", RestStatus.NOT_FOUND))
return
}
this.policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
try {
this.policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
} catch (e: IllegalArgumentException) {
actionListener.onFailure(OpenSearchStatusException("Could not find policy=${request.policyID}", RestStatus.NOT_FOUND))
return
}
if (!userHasPermissionForResource(user, policy.user, filterByEnabled, "policy", request.policyID, actionListener)) {
return
}
Expand Down Expand Up @@ -205,7 +245,7 @@ class TransportAddPolicyAction @Inject constructor(

val clusterStateRequest = ClusterStateRequest()
.clear()
.indices(*request.indices.toTypedArray())
.indices(*resolvedIndices.toTypedArray())
.metadata(true)
.local(false)
.waitForTimeout(TimeValue.timeValueMillis(ADD_POLICY_TIMEOUT_IN_MILLIS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ import org.opensearch.rest.RestStatus
import org.opensearch.search.fetch.subphase.FetchSourceContext
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.lang.IllegalArgumentException

private val log = LogManager.getLogger(TransportChangePolicyAction::class.java)

Expand Down Expand Up @@ -173,7 +174,12 @@ class TransportChangePolicyAction @Inject constructor(
actionListener.onFailure(OpenSearchStatusException("Could not find policy=${request.changePolicy.policyID}", RestStatus.NOT_FOUND))
return
}
policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
try {
policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
} catch (e: IllegalArgumentException) {
actionListener.onFailure(OpenSearchStatusException("Could not find policy=${request.changePolicy.policyID}", RestStatus.NOT_FOUND))
return
}
if (!userHasPermissionForResource(user, policy.user, filterByEnabled, "policy", request.changePolicy.policyID, actionListener)) {
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermis
import org.opensearch.rest.RestStatus
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.lang.IllegalArgumentException

@Suppress("ReturnCount")
class TransportDeletePolicyAction @Inject constructor(
val client: NodeClient,
transportService: TransportService,
Expand Down Expand Up @@ -84,12 +86,7 @@ class TransportDeletePolicyAction @Inject constructor(

fun start() {
client.threadPool().threadContext.stashContext().use {
if (user == null || !filterByEnabled) {
// Security is disabled or filter by is disabled
delete()
} else {
getPolicy()
}
getPolicy()
}
}

Expand All @@ -104,7 +101,13 @@ class TransportDeletePolicyAction @Inject constructor(
return
}

val policy: Policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
val policy: Policy?
try {
policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
} catch (e: IllegalArgumentException) {
actionListener.onFailure(OpenSearchStatusException("Policy ${request.policyID} is not found", RestStatus.NOT_FOUND))
return
}
if (!userHasPermissionForResource(user, policy.user, filterByEnabled, "policy", request.policyID, actionListener)) {
return
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ package org.opensearch.indexmanagement.indexstatemanagement.transport.action.exp
import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchSecurityException
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.cluster.state.ClusterStateResponse
Expand Down Expand Up @@ -63,9 +62,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.mana
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.managedIndex.ManagedIndexRequest
import org.opensearch.indexmanagement.indexstatemanagement.util.isMetadataMoved
import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexMetadataID
import org.opensearch.indexmanagement.util.IndexManagementException
import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser
import org.opensearch.rest.RestStatus
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.fetch.subphase.FetchSourceContext.FETCH_SOURCE
import org.opensearch.search.sort.SortBuilders
Expand Down Expand Up @@ -312,47 +309,78 @@ class TransportExplainAction @Inject constructor(
}
managedIndicesMetaDataMap.clear()

if (user == null) {
if (user == null || indexNames.isEmpty()) {
sendResponse()
} else {
validateAndSendResponse(threadContext)
filterAndSendResponse(threadContext)
}
}

private fun validateAndSendResponse(threadContext: ThreadContext.StoredContext) {
private fun filterAndSendResponse(threadContext: ThreadContext.StoredContext) {
threadContext.restore()
val request = ManagedIndexRequest().indices(*indexNames.toTypedArray())
val filteredIndices = mutableListOf<String>()
val filteredMetadata = mutableListOf<ManagedIndexMetaData?>()
val filteredPolicies = mutableListOf<String?>()
val enabledStatus = mutableMapOf<String, Boolean>()
filter(0, filteredIndices, filteredMetadata, filteredPolicies, enabledStatus)
}

private fun filter(
current: Int,
filteredIndices: MutableList<String>,
filteredMetadata: MutableList<ManagedIndexMetaData?>,
filteredPolicies: MutableList<String?>,
enabledStatus: MutableMap<String, Boolean>
) {
val request = ManagedIndexRequest().indices(indexNames[current])
client.execute(
ManagedIndexAction.INSTANCE,
request,
object : ActionListener<AcknowledgedResponse> {
override fun onResponse(response: AcknowledgedResponse) {
sendResponse()
filteredIndices.add(indexNames[current])
filteredMetadata.add(indexMetadatas[current])
filteredPolicies.add(indexPolicyIDs[current])
enabledStatus[indexNames[current]] = enabledState.getOrDefault(indexNames[current], false)
if (current < indexNames.count() - 1) {
// do nothing - skip the index and go to next one
filter(current + 1, filteredIndices, filteredMetadata, filteredPolicies, enabledStatus)
} else {
sendResponse(filteredIndices, filteredMetadata, filteredPolicies, enabledStatus)
}
}

override fun onFailure(e: java.lang.Exception) {
actionListener.onFailure(
IndexManagementException.wrap(
when (e is OpenSearchSecurityException) {
true -> OpenSearchStatusException(
"User doesn't have required index permissions on one or more requested indices: ${e.localizedMessage}",
RestStatus.FORBIDDEN
)
false -> e
override fun onFailure(e: Exception) {
when (e is OpenSearchSecurityException) {
true -> {
if (current < indexNames.count() - 1) {
// do nothing - skip the index and go to next one
filter(current + 1, filteredIndices, filteredMetadata, filteredPolicies, enabledStatus)
} else {
sendResponse(filteredIndices, filteredMetadata, filteredPolicies, enabledStatus)
}
)
)
}
false -> {
actionListener.onFailure(e)
}
}
}
}
)
}

private fun sendResponse() {
private fun sendResponse(
indices: List<String> = indexNames,
metadata: List<ManagedIndexMetaData?> = indexMetadatas,
policies: List<String?> = indexPolicyIDs,
enabledStatus: Map<String, Boolean> = enabledState,
totalIndices: Int = totalManagedIndices
) {
if (explainAll) {
actionListener.onResponse(ExplainAllResponse(indexNames, indexPolicyIDs, indexMetadatas, totalManagedIndices, enabledState))
actionListener.onResponse(ExplainAllResponse(indices, policies, metadata, totalIndices, enabledStatus))
return
}
actionListener.onResponse(ExplainResponse(indexNames, indexPolicyIDs, indexMetadatas))
actionListener.onResponse(ExplainResponse(indices, policies, metadata))
}

private fun getMetadata(response: GetResponse?): ManagedIndexMetaData? {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermis
import org.opensearch.rest.RestStatus
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.lang.IllegalArgumentException

@Suppress("ReturnCount")
class TransportGetPolicyAction @Inject constructor(
val client: NodeClient,
transportService: TransportService,
Expand Down Expand Up @@ -104,7 +106,13 @@ class TransportGetPolicyAction @Inject constructor(
return
}

val policy: Policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
val policy: Policy?
try {
policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
} catch (e: IllegalArgumentException) {
actionListener.onFailure(OpenSearchStatusException("Policy not found", RestStatus.NOT_FOUND))
return
}
if (!userHasPermissionForResource(user, policy.user, filterByEnabled, "policy", request.policyID, actionListener)) {
return
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.lang.Exception

@Suppress("ReturnCount")
class TransportDeleteRollupAction @Inject constructor(
transportService: TransportService,
val client: Client,
Expand Down Expand Up @@ -84,12 +85,7 @@ class TransportDeleteRollupAction @Inject constructor(

fun start() {
client.threadPool().threadContext.stashContext().use {
if (!filterByEnabled || user == null) {
// security is disabled or filter by is disabled
delete()
} else {
getRollup()
}
getRollup()
}
}

Expand All @@ -104,8 +100,14 @@ class TransportDeleteRollupAction @Inject constructor(
return
}

val rollup: Rollup = parseRollup(response, xContentRegistry)
if (!userHasPermissionForResource(user, rollup.user, filterByEnabled, "rollup", request.id(), actionListener)) {
val rollup: Rollup?
try {
rollup = parseRollup(response, xContentRegistry)
} catch (e: IllegalArgumentException) {
actionListener.onFailure(OpenSearchStatusException("Rollup ${request.id()} is not found", RestStatus.NOT_FOUND))
return
}
if (!userHasPermissionForResource(user, rollup.user, filterByEnabled, "rollup", rollup.id, actionListener)) {
return
} else {
delete()
Expand Down
Loading