Skip to content

Commit ba0b583

Browse files
Fix LRON concurrent indexing throw ResourceAlreadyExists (#831) (#842)
* fix LRON unwrap exception and some IT Signed-off-by: zhichao-aws <zhichaog@amazon.com> * remove useless log Signed-off-by: zhichao-aws <zhichaog@amazon.com> --------- Signed-off-by: zhichao-aws <zhichaog@amazon.com> Co-authored-by: Hailong Cui <ihailong@amazon.com> (cherry picked from commit f07fa2b) Co-authored-by: zhichao-aws <zhichaog@amazon.com>
1 parent 930157b commit ba0b583

File tree

5 files changed

+53
-23
lines changed

5 files changed

+53
-23
lines changed

src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/ControlCenterIndices.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.indexmanagement.controlcenter.notification
77

8+
import org.opensearch.ExceptionsHelper
89
import org.opensearch.ResourceAlreadyExistsException
910
import org.opensearch.action.ActionListener
1011
import org.opensearch.action.admin.indices.create.CreateIndexRequest
@@ -31,7 +32,7 @@ class ControlCenterIndices(
3132
indexRequest,
3233
object : ActionListener<CreateIndexResponse> {
3334
override fun onFailure(e: Exception) {
34-
if (e is ResourceAlreadyExistsException) {
35+
if (ExceptionsHelper.unwrapCause(e) is ResourceAlreadyExistsException) {
3536
/* if two request create the control center index at the same time, may raise this exception */
3637
/* but we don't take it as error */
3738
actionListener.onResponse(

src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/LRONConfigRestTestCase.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@ import org.opensearch.indexmanagement.IndexManagementPlugin
1717
import org.opensearch.indexmanagement.IndexManagementRestTestCase
1818
import org.opensearch.indexmanagement.controlcenter.notification.initNodeIdsInRestIT
1919
import org.opensearch.indexmanagement.controlcenter.notification.model.LRONConfig
20-
import org.opensearch.indexmanagement.controlcenter.notification.nodeIdsInRestIT
21-
import org.opensearch.indexmanagement.controlcenter.notification.randomLRONConfig
22-
import org.opensearch.indexmanagement.controlcenter.notification.randomTaskId
2320
import org.opensearch.indexmanagement.controlcenter.notification.toJsonString
2421
import org.opensearch.indexmanagement.makeRequest
2522
import org.opensearch.rest.RestStatus
@@ -30,8 +27,6 @@ abstract class LRONConfigRestTestCase : IndexManagementRestTestCase() {
3027
setDebugLogLevel()
3128
/* init cluster node ids in integ test */
3229
initNodeIdsInRestIT(client())
33-
/* index a random doc to create .opensearch-control-center index */
34-
createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())))
3530
}
3631

3732
@After
@@ -74,8 +69,13 @@ abstract class LRONConfigRestTestCase : IndexManagementRestTestCase() {
7469

7570
companion object {
7671
@AfterClass
77-
@JvmStatic fun clearIndicesAfterClass() {
78-
wipeAllIndices()
72+
@JvmStatic fun removeControlCenterIndex() {
73+
try {
74+
adminClient().makeRequest("DELETE", IndexManagementPlugin.CONTROL_CENTER_INDEX, emptyMap())
75+
} catch (e: ResponseException) {
76+
/* ignore if the index has not been created */
77+
assertEquals("Unexpected status", RestStatus.NOT_FOUND, RestStatus.fromCode(e.response.statusLine.statusCode))
78+
}
7979
}
8080
}
8181
}

src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestDeleteLRONConfigActionIT.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ class RestDeleteLRONConfigActionIT : LRONConfigRestTestCase() {
3939
}
4040

4141
fun `test delete nonexist LRONConfig response`() {
42+
/* index a random doc to create .opensearch-control-center index */
43+
createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())))
4244
val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))
4345
val response = client().makeRequest("DELETE", getResourceURI(lronConfig.taskId, lronConfig.actionName))
4446
assertEquals("delete LRONConfig failed", RestStatus.OK, response.restStatus())

src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestGetLRONConfigActionIT.kt

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ class RestGetLRONConfigActionIT : LRONConfigRestTestCase() {
3939
}
4040

4141
fun `test get nonexist LRONConfig fails`() {
42+
/* index a random doc to create .opensearch-control-center index */
43+
createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())))
4244
try {
4345
val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))
4446
client().makeRequest("GET", getResourceURI(lronConfig.taskId, lronConfig.actionName))
@@ -51,7 +53,7 @@ class RestGetLRONConfigActionIT : LRONConfigRestTestCase() {
5153

5254
fun `test get all LRONConfigs`() {
5355
/* LRONConfigRestTestCase index a doc to auto create the index, here we wipe the index before count doc number */
54-
wipeAllIndices()
56+
removeControlCenterIndex()
5557
val lronConfigResponses = randomList(1, 15) {
5658
createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))).asMap()
5759
}
@@ -90,16 +92,11 @@ class RestGetLRONConfigActionIT : LRONConfigRestTestCase() {
9092
}
9193

9294
fun `test get all LRONConfig if index not exists`() {
93-
try {
94-
wipeAllIndices()
95-
val response = client().makeRequest("GET", IndexManagementPlugin.LRON_BASE_URI)
96-
assertEquals("get LRONConfigs failed", RestStatus.OK, response.restStatus())
97-
val responseBody = response.asMap()
98-
val totalNumber = responseBody["total_number"]
99-
OpenSearchTestCase.assertEquals("wrong LRONConfigs number", 0, totalNumber)
100-
} finally {
101-
/* index a random doc to create .opensearch-control-center index */
102-
createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())))
103-
}
95+
removeControlCenterIndex()
96+
val response = client().makeRequest("GET", IndexManagementPlugin.LRON_BASE_URI)
97+
assertEquals("get LRONConfigs failed", RestStatus.OK, response.restStatus())
98+
val responseBody = response.asMap()
99+
val totalNumber = responseBody["total_number"]
100+
OpenSearchTestCase.assertEquals("wrong LRONConfigs number", 0, totalNumber)
104101
}
105102
}

src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestIndexLRONConfigActionIT.kt

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55

66
package org.opensearch.indexmanagement.controlcenter.notification.resthandler
77

8+
import kotlinx.coroutines.asCoroutineDispatcher
9+
import kotlinx.coroutines.async
10+
import kotlinx.coroutines.awaitAll
11+
import kotlinx.coroutines.runBlocking
812
import org.junit.Assert
913
import org.opensearch.client.ResponseException
1014
import org.opensearch.common.xcontent.XContentType
@@ -21,6 +25,7 @@ import org.opensearch.indexmanagement.makeRequest
2125
import org.opensearch.indexmanagement.opensearchapi.convertToMap
2226
import org.opensearch.indexmanagement.util.DRY_RUN
2327
import org.opensearch.rest.RestStatus
28+
import java.util.concurrent.Executors
2429

2530
@Suppress("UNCHECKED_CAST")
2631
class RestIndexLRONConfigActionIT : LRONConfigRestTestCase() {
@@ -155,21 +160,21 @@ class RestIndexLRONConfigActionIT : LRONConfigRestTestCase() {
155160
}
156161

157162
fun `test autocreate index for indexLRONConfig action`() {
158-
wipeAllIndices()
163+
removeControlCenterIndex()
159164
val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))
160165
var response = createLRONConfig(lronConfig)
161166
assertEquals("Create LRONConfig failed", RestStatus.OK, response.restStatus())
162-
wipeAllIndices()
167+
removeControlCenterIndex()
163168
response = client().makeRequest(
164169
"PUT",
165170
getResourceURI(lronConfig.taskId, lronConfig.actionName),
166171
lronConfig.toHttpEntity()
167172
)
168173
assertEquals("Create LRONConfig failed", RestStatus.OK, response.restStatus())
169-
wipeAllIndices()
170174
}
171175

172176
fun `test mappings after LRONConfig creation`() {
177+
removeControlCenterIndex()
173178
val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))
174179
createLRONConfig(lronConfig)
175180

@@ -185,4 +190,29 @@ class RestIndexLRONConfigActionIT : LRONConfigRestTestCase() {
185190

186191
assertEquals("Mappings are different", expectedMap, mappingsMap)
187192
}
193+
194+
fun `test concurrent indexing requests auto create index and not throw exception`() {
195+
removeControlCenterIndex()
196+
val threadSize = 5
197+
val lronConfigs = List(threadSize) { randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) }
198+
val threadPool = Executors.newFixedThreadPool(threadSize)
199+
try {
200+
runBlocking {
201+
val dispatcher = threadPool.asCoroutineDispatcher()
202+
val responses = lronConfigs.map {
203+
async(dispatcher) {
204+
createLRONConfig(it)
205+
}
206+
}.awaitAll()
207+
responses.forEach { assertEquals("Create LRONConfig failed", RestStatus.OK, it.restStatus()) }
208+
}
209+
} finally {
210+
threadPool.shutdown()
211+
}
212+
val response = client().makeRequest("GET", IndexManagementPlugin.LRON_BASE_URI)
213+
assertEquals("get LRONConfigs failed", RestStatus.OK, response.restStatus())
214+
val responseBody = response.asMap()
215+
val totalNumber = responseBody["total_number"]
216+
assertEquals("wrong LRONConfigs number", threadSize, totalNumber)
217+
}
188218
}

0 commit comments

Comments
 (0)