@@ -10,6 +10,7 @@ import org.apache.http.entity.StringEntity
1010import org.junit.AfterClass
1111import org.junit.Before
1212import org.junit.rules.DisableOnDebug
13+ import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksAction
1314import org.opensearch.client.Request
1415import org.opensearch.client.Response
1516import org.opensearch.client.RestClient
@@ -25,11 +26,15 @@ import org.opensearch.common.xcontent.NamedXContentRegistry
2526import org.opensearch.common.xcontent.XContentType
2627import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN
2728import org.opensearch.rest.RestStatus
29+ import java.io.IOException
2830import java.nio.file.Files
31+ import java.util.*
2932import javax.management.MBeanServerInvocationHandler
3033import javax.management.ObjectName
3134import javax.management.remote.JMXConnectorFactory
3235import javax.management.remote.JMXServiceURL
36+ import kotlin.collections.ArrayList
37+ import kotlin.collections.HashSet
3338
3439abstract class IndexManagementRestTestCase : ODFERestTestCase () {
3540
@@ -63,7 +68,6 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
6368
6469 protected val isDebuggingTest = DisableOnDebug (null ).isDebugging
6570 protected val isDebuggingRemoteCluster = System .getProperty(" cluster.debug" , " false" )!! .toBoolean()
66- protected val isMultiNode = System .getProperty(" cluster.number_of_nodes" , " 1" ).toInt() > 1
6771
6872 protected val isLocalTest = clusterName() == " integTest"
6973 private fun clusterName (): String {
@@ -160,21 +164,15 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
160164 }
161165
162166 override fun preserveIndicesUponCompletion (): Boolean = true
163-
164167 companion object {
165-
168+ @JvmStatic
169+ protected val isMultiNode = System .getProperty(" cluster.number_of_nodes" , " 1" ).toInt() > 1
166170 protected val defaultKeepIndexSet = setOf (" .opendistro_security" )
167171 /* *
168- * This clean up function can be use in @After or @AfterClass in the base test file
169- * of your feature test suite
172+ * We override preserveIndicesUponCompletion to true and use this function to clean up indices
173+ * Meant to be used in @After or @AfterClass of your feature test suite
170174 */
171175 fun wipeAllIndices (client : RestClient = adminClient(), keepIndex : kotlin.collections.Set <String > = defaultKeepIndexSet) {
172- waitFor {
173- waitForRunningTasks(client)
174- waitForPendingTasks(client)
175- waitForThreadPools(client)
176- }
177- // Delete all data stream indices
178176 try {
179177 client.performRequest(Request (" DELETE" , " _data_stream/*" ))
180178 } catch (e: ResponseException ) {
@@ -186,9 +184,7 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
186184 }
187185 }
188186
189- // Delete all indices
190187 val response = client.performRequest(Request (" GET" , " /_cat/indices?format=json&expand_wildcards=all" ))
191-
192188 val xContentType = XContentType .fromMediaType(response.entity.contentType.value)
193189 xContentType.xContent().createParser(
194190 NamedXContentRegistry .EMPTY , DeprecationHandler .THROW_UNSUPPORTED_OPERATION ,
@@ -208,6 +204,72 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
208204 }
209205 }
210206 }
207+
208+ waitFor {
209+ if (! isMultiNode) {
210+ waitForRunningTasks(client)
211+ waitForPendingTasks(client)
212+ waitForThreadPools(client)
213+ } else {
214+ // Multi node test is not suitable to waitFor
215+ // We have seen long-running write task that fails the waitFor
216+ // probably because of cluster manager - data node task not in sync
217+ // So instead we just sleep 1s after wiping indices
218+ Thread .sleep(1_000 )
219+ }
220+ }
221+ }
222+
223+ @JvmStatic
224+ @Throws(IOException ::class )
225+ protected fun waitForRunningTasks (client : RestClient ) {
226+ val runningTasks: MutableSet <String > = runningTasks(client.performRequest(Request (" GET" , " /_tasks?detailed" )))
227+ if (runningTasks.isEmpty()) {
228+ return
229+ }
230+ val stillRunning = ArrayList <String >(runningTasks)
231+ fail(" ${Date ()} : There are still tasks running after this test that might break subsequent tests: \n ${stillRunning.joinToString(" \n " )} ." )
232+ }
233+
234+ @Suppress(" UNCHECKED_CAST" )
235+ @Throws(IOException ::class )
236+ private fun runningTasks (response : Response ): MutableSet <String > {
237+ val runningTasks: MutableSet <String > = HashSet ()
238+ val nodes = entityAsMap(response)[" nodes" ] as Map <String , Any >?
239+ for ((_, value) in nodes!! ) {
240+ val nodeInfo = value as Map <String , Any >
241+ val nodeTasks = nodeInfo[" tasks" ] as Map <String , Any >?
242+ for ((_, value1) in nodeTasks!! ) {
243+ val task = value1 as Map <String , Any >
244+ // Ignore the task list API - it doesn't count against us
245+ if (task[" action" ] == ListTasksAction .NAME || task[" action" ] == ListTasksAction .NAME + " [n]" ) continue
246+ // runningTasks.add(task["action"].toString() + " | " + task["description"].toString())
247+ runningTasks.add(task.toString())
248+ }
249+ }
250+ return runningTasks
251+ }
252+
253+ @JvmStatic
254+ protected fun waitForThreadPools (client : RestClient ) {
255+ val response = client.performRequest(Request (" GET" , " /_cat/thread_pool?format=json" ))
256+
257+ val xContentType = XContentType .fromMediaType(response.entity.contentType.value)
258+ xContentType.xContent().createParser(
259+ NamedXContentRegistry .EMPTY , DeprecationHandler .THROW_UNSUPPORTED_OPERATION ,
260+ response.entity.content
261+ ).use { parser ->
262+ for (index in parser.list()) {
263+ val jsonObject: Map <* , * > = index as java.util.HashMap <* , * >
264+ val active = (jsonObject[" active" ] as String ).toInt()
265+ val queue = (jsonObject[" queue" ] as String ).toInt()
266+ val name = jsonObject[" name" ]
267+ val trueActive = if (name == " management" ) active - 1 else active
268+ if (trueActive > 0 || queue > 0 ) {
269+ fail(" Still active threadpools in cluster: $jsonObject " )
270+ }
271+ }
272+ }
211273 }
212274
213275 internal interface IProxy {
0 commit comments