Skip to content

Commit 11a1f73

Browse files
authored
Security improvements (opensearch-project#126)
Signed-off-by: Ravi Thaluru <ravi1092@gmail.com>
1 parent 88cea4a commit 11a1f73

File tree

15 files changed

+307
-144
lines changed

15 files changed

+307
-144
lines changed

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt

Lines changed: 60 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import org.opensearch.action.support.master.AcknowledgedResponse
4747
import org.opensearch.client.node.NodeClient
4848
import org.opensearch.cluster.ClusterState
4949
import org.opensearch.cluster.block.ClusterBlockException
50+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
5051
import org.opensearch.cluster.service.ClusterService
5152
import org.opensearch.common.inject.Inject
5253
import org.opensearch.common.settings.Settings
@@ -64,7 +65,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.FailedIndex
6465
import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexConfigIndexRequest
6566
import org.opensearch.indexmanagement.opensearchapi.parseFromGetResponse
6667
import org.opensearch.indexmanagement.settings.IndexManagementSettings
67-
import org.opensearch.indexmanagement.util.IndexManagementException
6868
import org.opensearch.indexmanagement.util.IndexUtils
6969
import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser
7070
import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermissionForResource
@@ -73,19 +73,21 @@ import org.opensearch.rest.RestStatus
7373
import org.opensearch.tasks.Task
7474
import org.opensearch.transport.TransportService
7575
import java.lang.Exception
76+
import java.lang.IllegalArgumentException
7677
import java.time.Duration
7778
import java.time.Instant
7879

7980
private val log = LogManager.getLogger(TransportAddPolicyAction::class.java)
8081

81-
@Suppress("SpreadOperator")
82+
@Suppress("SpreadOperator", "ReturnCount")
8283
class TransportAddPolicyAction @Inject constructor(
8384
val client: NodeClient,
8485
transportService: TransportService,
8586
actionFilters: ActionFilters,
8687
val settings: Settings,
8788
val clusterService: ClusterService,
88-
val xContentRegistry: NamedXContentRegistry
89+
val xContentRegistry: NamedXContentRegistry,
90+
val indexNameExpressionResolver: IndexNameExpressionResolver
8991
) : HandledTransportAction<AddPolicyRequest, ISMStatusResponse>(
9092
AddPolicyAction.NAME, transportService, actionFilters, ::AddPolicyRequest
9193
) {
@@ -114,47 +116,80 @@ class TransportAddPolicyAction @Inject constructor(
114116
) {
115117
private lateinit var startTime: Instant
116118
private lateinit var policy: Policy
119+
private val resolvedIndices = mutableListOf<String>()
117120
private val indicesToAdd = mutableMapOf<String, String>() // uuid: name
118121
private val failedIndices: MutableList<FailedIndex> = mutableListOf()
119122

120123
fun start() {
121124
if (!validateUserConfiguration(user, filterByEnabled, actionListener)) {
122125
return
123126
}
127+
val requestedIndices = mutableListOf<String>()
128+
request.indices.forEach { index ->
129+
requestedIndices.addAll(
130+
indexNameExpressionResolver.concreteIndexNames(
131+
clusterService.state(),
132+
IndicesOptions.lenientExpand(),
133+
true,
134+
index
135+
)
136+
)
137+
}
138+
if (requestedIndices.isEmpty()) {
139+
// Nothing to do will ignore since found no matching indices
140+
actionListener.onResponse(ISMStatusResponse(0, failedIndices))
141+
return
142+
}
124143
if (user == null) {
144+
resolvedIndices.addAll(requestedIndices)
125145
getPolicy()
126146
} else {
127-
validateAndGetPolicy()
147+
validateAndGetPolicy(0, requestedIndices)
128148
}
129149
}
130150

131-
private fun validateAndGetPolicy() {
132-
val request = ManagedIndexRequest().indices(*request.indices.toTypedArray())
151+
/**
152+
* We filter the requested indices to the indices user has permission to manage and apply policies only on top of those
153+
*/
154+
private fun validateAndGetPolicy(current: Int, indices: List<String>) {
155+
val request = ManagedIndexRequest().indices(indices[current])
133156
client.execute(
134157
ManagedIndexAction.INSTANCE,
135158
request,
136159
object : ActionListener<AcknowledgedResponse> {
137160
override fun onResponse(response: AcknowledgedResponse) {
138-
getPolicy()
161+
resolvedIndices.add(indices[current])
162+
proceed(current, indices)
139163
}
140164

141165
override fun onFailure(e: Exception) {
142-
actionListener.onFailure(
143-
IndexManagementException.wrap(
144-
when (e is OpenSearchSecurityException) {
145-
true -> OpenSearchStatusException(
146-
"User doesn't have required index permissions on one or more requested indices: ${e.localizedMessage}",
147-
RestStatus.FORBIDDEN
148-
)
149-
false -> e
150-
}
151-
)
152-
)
166+
when (e is OpenSearchSecurityException) {
167+
true -> {
168+
proceed(current, indices)
169+
}
170+
false -> {
171+
// failing the request for any other exception
172+
actionListener.onFailure(e)
173+
}
174+
}
153175
}
154176
}
155177
)
156178
}
157179

180+
private fun proceed(current: Int, indices: List<String>) {
181+
if (current < indices.count() - 1) {
182+
validateAndGetPolicy(current + 1, indices)
183+
} else {
184+
// sanity check that there are indices - if none then return
185+
if (resolvedIndices.isEmpty()) {
186+
actionListener.onResponse(ISMStatusResponse(0, failedIndices))
187+
return
188+
}
189+
getPolicy()
190+
}
191+
}
192+
158193
private fun getPolicy() {
159194
val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, request.policyID)
160195

@@ -171,7 +206,12 @@ class TransportAddPolicyAction @Inject constructor(
171206
actionListener.onFailure(OpenSearchStatusException("Could not find policy=${request.policyID}", RestStatus.NOT_FOUND))
172207
return
173208
}
174-
this.policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
209+
try {
210+
this.policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
211+
} catch (e: IllegalArgumentException) {
212+
actionListener.onFailure(OpenSearchStatusException("Could not find policy=${request.policyID}", RestStatus.NOT_FOUND))
213+
return
214+
}
175215
if (!userHasPermissionForResource(user, policy.user, filterByEnabled, "policy", request.policyID, actionListener)) {
176216
return
177217
}
@@ -205,7 +245,7 @@ class TransportAddPolicyAction @Inject constructor(
205245

206246
val clusterStateRequest = ClusterStateRequest()
207247
.clear()
208-
.indices(*request.indices.toTypedArray())
248+
.indices(*resolvedIndices.toTypedArray())
209249
.metadata(true)
210250
.local(false)
211251
.waitForTimeout(TimeValue.timeValueMillis(ADD_POLICY_TIMEOUT_IN_MILLIS))

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ import org.opensearch.rest.RestStatus
8080
import org.opensearch.search.fetch.subphase.FetchSourceContext
8181
import org.opensearch.tasks.Task
8282
import org.opensearch.transport.TransportService
83+
import java.lang.IllegalArgumentException
8384

8485
private val log = LogManager.getLogger(TransportChangePolicyAction::class.java)
8586

@@ -173,7 +174,12 @@ class TransportChangePolicyAction @Inject constructor(
173174
actionListener.onFailure(OpenSearchStatusException("Could not find policy=${request.changePolicy.policyID}", RestStatus.NOT_FOUND))
174175
return
175176
}
176-
policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
177+
try {
178+
policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
179+
} catch (e: IllegalArgumentException) {
180+
actionListener.onFailure(OpenSearchStatusException("Could not find policy=${request.changePolicy.policyID}", RestStatus.NOT_FOUND))
181+
return
182+
}
177183
if (!userHasPermissionForResource(user, policy.user, filterByEnabled, "policy", request.changePolicy.policyID, actionListener)) {
178184
return
179185
}

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/deletepolicy/TransportDeletePolicyAction.kt

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermis
5151
import org.opensearch.rest.RestStatus
5252
import org.opensearch.tasks.Task
5353
import org.opensearch.transport.TransportService
54+
import java.lang.IllegalArgumentException
5455

56+
@Suppress("ReturnCount")
5557
class TransportDeletePolicyAction @Inject constructor(
5658
val client: NodeClient,
5759
transportService: TransportService,
@@ -84,12 +86,7 @@ class TransportDeletePolicyAction @Inject constructor(
8486

8587
fun start() {
8688
client.threadPool().threadContext.stashContext().use {
87-
if (user == null || !filterByEnabled) {
88-
// Security is disabled or filter by is disabled
89-
delete()
90-
} else {
91-
getPolicy()
92-
}
89+
getPolicy()
9390
}
9491
}
9592

@@ -104,7 +101,13 @@ class TransportDeletePolicyAction @Inject constructor(
104101
return
105102
}
106103

107-
val policy: Policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
104+
val policy: Policy?
105+
try {
106+
policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
107+
} catch (e: IllegalArgumentException) {
108+
actionListener.onFailure(OpenSearchStatusException("Policy ${request.policyID} is not found", RestStatus.NOT_FOUND))
109+
return
110+
}
108111
if (!userHasPermissionForResource(user, policy.user, filterByEnabled, "policy", request.policyID, actionListener)) {
109112
return
110113
} else {

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt

Lines changed: 50 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ package org.opensearch.indexmanagement.indexstatemanagement.transport.action.exp
2929
import org.apache.logging.log4j.LogManager
3030
import org.opensearch.ExceptionsHelper
3131
import org.opensearch.OpenSearchSecurityException
32-
import org.opensearch.OpenSearchStatusException
3332
import org.opensearch.action.ActionListener
3433
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
3534
import org.opensearch.action.admin.cluster.state.ClusterStateResponse
@@ -63,9 +62,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.mana
6362
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.managedIndex.ManagedIndexRequest
6463
import org.opensearch.indexmanagement.indexstatemanagement.util.isMetadataMoved
6564
import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexMetadataID
66-
import org.opensearch.indexmanagement.util.IndexManagementException
6765
import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser
68-
import org.opensearch.rest.RestStatus
6966
import org.opensearch.search.builder.SearchSourceBuilder
7067
import org.opensearch.search.fetch.subphase.FetchSourceContext.FETCH_SOURCE
7168
import org.opensearch.search.sort.SortBuilders
@@ -312,47 +309,78 @@ class TransportExplainAction @Inject constructor(
312309
}
313310
managedIndicesMetaDataMap.clear()
314311

315-
if (user == null) {
312+
if (user == null || indexNames.isEmpty()) {
316313
sendResponse()
317314
} else {
318-
validateAndSendResponse(threadContext)
315+
filterAndSendResponse(threadContext)
319316
}
320317
}
321318

322-
private fun validateAndSendResponse(threadContext: ThreadContext.StoredContext) {
319+
private fun filterAndSendResponse(threadContext: ThreadContext.StoredContext) {
323320
threadContext.restore()
324-
val request = ManagedIndexRequest().indices(*indexNames.toTypedArray())
321+
val filteredIndices = mutableListOf<String>()
322+
val filteredMetadata = mutableListOf<ManagedIndexMetaData?>()
323+
val filteredPolicies = mutableListOf<String?>()
324+
val enabledStatus = mutableMapOf<String, Boolean>()
325+
filter(0, filteredIndices, filteredMetadata, filteredPolicies, enabledStatus)
326+
}
327+
328+
private fun filter(
329+
current: Int,
330+
filteredIndices: MutableList<String>,
331+
filteredMetadata: MutableList<ManagedIndexMetaData?>,
332+
filteredPolicies: MutableList<String?>,
333+
enabledStatus: MutableMap<String, Boolean>
334+
) {
335+
val request = ManagedIndexRequest().indices(indexNames[current])
325336
client.execute(
326337
ManagedIndexAction.INSTANCE,
327338
request,
328339
object : ActionListener<AcknowledgedResponse> {
329340
override fun onResponse(response: AcknowledgedResponse) {
330-
sendResponse()
341+
filteredIndices.add(indexNames[current])
342+
filteredMetadata.add(indexMetadatas[current])
343+
filteredPolicies.add(indexPolicyIDs[current])
344+
enabledStatus[indexNames[current]] = enabledState.getOrDefault(indexNames[current], false)
345+
if (current < indexNames.count() - 1) {
346+
// do nothing - skip the index and go to next one
347+
filter(current + 1, filteredIndices, filteredMetadata, filteredPolicies, enabledStatus)
348+
} else {
349+
sendResponse(filteredIndices, filteredMetadata, filteredPolicies, enabledStatus)
350+
}
331351
}
332352

333-
override fun onFailure(e: java.lang.Exception) {
334-
actionListener.onFailure(
335-
IndexManagementException.wrap(
336-
when (e is OpenSearchSecurityException) {
337-
true -> OpenSearchStatusException(
338-
"User doesn't have required index permissions on one or more requested indices: ${e.localizedMessage}",
339-
RestStatus.FORBIDDEN
340-
)
341-
false -> e
353+
override fun onFailure(e: Exception) {
354+
when (e is OpenSearchSecurityException) {
355+
true -> {
356+
if (current < indexNames.count() - 1) {
357+
// do nothing - skip the index and go to next one
358+
filter(current + 1, filteredIndices, filteredMetadata, filteredPolicies, enabledStatus)
359+
} else {
360+
sendResponse(filteredIndices, filteredMetadata, filteredPolicies, enabledStatus)
342361
}
343-
)
344-
)
362+
}
363+
false -> {
364+
actionListener.onFailure(e)
365+
}
366+
}
345367
}
346368
}
347369
)
348370
}
349371

350-
private fun sendResponse() {
372+
private fun sendResponse(
373+
indices: List<String> = indexNames,
374+
metadata: List<ManagedIndexMetaData?> = indexMetadatas,
375+
policies: List<String?> = indexPolicyIDs,
376+
enabledStatus: Map<String, Boolean> = enabledState,
377+
totalIndices: Int = totalManagedIndices
378+
) {
351379
if (explainAll) {
352-
actionListener.onResponse(ExplainAllResponse(indexNames, indexPolicyIDs, indexMetadatas, totalManagedIndices, enabledState))
380+
actionListener.onResponse(ExplainAllResponse(indices, policies, metadata, totalIndices, enabledStatus))
353381
return
354382
}
355-
actionListener.onResponse(ExplainResponse(indexNames, indexPolicyIDs, indexMetadatas))
383+
actionListener.onResponse(ExplainResponse(indices, policies, metadata))
356384
}
357385

358386
private fun getMetadata(response: GetResponse?): ManagedIndexMetaData? {

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/TransportGetPolicyAction.kt

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermis
4848
import org.opensearch.rest.RestStatus
4949
import org.opensearch.tasks.Task
5050
import org.opensearch.transport.TransportService
51+
import java.lang.IllegalArgumentException
5152

53+
@Suppress("ReturnCount")
5254
class TransportGetPolicyAction @Inject constructor(
5355
val client: NodeClient,
5456
transportService: TransportService,
@@ -104,7 +106,13 @@ class TransportGetPolicyAction @Inject constructor(
104106
return
105107
}
106108

107-
val policy: Policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
109+
val policy: Policy?
110+
try {
111+
policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
112+
} catch (e: IllegalArgumentException) {
113+
actionListener.onFailure(OpenSearchStatusException("Policy not found", RestStatus.NOT_FOUND))
114+
return
115+
}
108116
if (!userHasPermissionForResource(user, policy.user, filterByEnabled, "policy", request.policyID, actionListener)) {
109117
return
110118
} else {

src/main/kotlin/org/opensearch/indexmanagement/rollup/action/delete/TransportDeleteRollupAction.kt

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import org.opensearch.tasks.Task
5252
import org.opensearch.transport.TransportService
5353
import java.lang.Exception
5454

55+
@Suppress("ReturnCount")
5556
class TransportDeleteRollupAction @Inject constructor(
5657
transportService: TransportService,
5758
val client: Client,
@@ -84,12 +85,7 @@ class TransportDeleteRollupAction @Inject constructor(
8485

8586
fun start() {
8687
client.threadPool().threadContext.stashContext().use {
87-
if (!filterByEnabled || user == null) {
88-
// security is disabled or filter by is disabled
89-
delete()
90-
} else {
91-
getRollup()
92-
}
88+
getRollup()
9389
}
9490
}
9591

@@ -104,8 +100,14 @@ class TransportDeleteRollupAction @Inject constructor(
104100
return
105101
}
106102

107-
val rollup: Rollup = parseRollup(response, xContentRegistry)
108-
if (!userHasPermissionForResource(user, rollup.user, filterByEnabled, "rollup", request.id(), actionListener)) {
103+
val rollup: Rollup?
104+
try {
105+
rollup = parseRollup(response, xContentRegistry)
106+
} catch (e: IllegalArgumentException) {
107+
actionListener.onFailure(OpenSearchStatusException("Rollup ${request.id()} is not found", RestStatus.NOT_FOUND))
108+
return
109+
}
110+
if (!userHasPermissionForResource(user, rollup.user, filterByEnabled, "rollup", rollup.id, actionListener)) {
109111
return
110112
} else {
111113
delete()

0 commit comments

Comments
 (0)