Skip to content

Commit 4497be9

Browse files
committed
Fix flaky rollup test by stopping jobs before index cleanup
The test was flaky because rollup job coroutines continued running after test cleanup, causing a race condition where the config index was recreated with incorrect dynamic mappings. Root cause: - Tests create enabled rollups -> JobScheduler schedules them - RollupRunner.runJob() launches coroutines via launch {} - Test @after wipes indices - BUT coroutines are still running in background - Coroutines write metadata AFTER indices wiped - Index auto-creates with wrong dynamic mappings (long vs date) - Next test fails with mapping conflict on rollup_metadata.continuous.next_window_end_time Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
1 parent a81866f commit 4497be9

File tree

2 files changed

+27
-13
lines changed

2 files changed

+27
-13
lines changed

src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {
4949

5050
@After
5151
@Suppress("UNCHECKED_CAST")
52-
fun KillAllCancallableRunningTasks() {
52+
fun killAllCancellableRunningTasks() {
5353
client().makeRequest("POST", "_tasks/_cancel?actions=*")
5454
waitFor {
5555
val response = client().makeRequest("GET", "_tasks")
@@ -68,21 +68,27 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {
6868
}
6969

7070
@Suppress("UNCHECKED_CAST")
71-
fun waitForCancallableTasksToFinish() {
72-
waitFor {
73-
val response = client().makeRequest("GET", "_tasks")
74-
val nodes = response.asMap()["nodes"] as Map<String, Any?>
75-
var hasCancallableRunningTasks = false
76-
nodes.forEach {
77-
val tasks = (it.value as Map<String, Any?>)["tasks"] as Map<String, Any?>
78-
tasks.forEach { e ->
79-
if ((e.value as Map<String, Any?>)["cancellable"] as Boolean) {
80-
hasCancallableRunningTasks = true
81-
logger.info("cancellable task running: ${e.key}")
71+
protected fun stopAllRollupJobs() {
72+
try {
73+
val response = client().makeRequest("GET", "$ROLLUP_JOBS_BASE_URI?size=1000")
74+
val rollupsList = response.asMap()["rollups"] as? List<Map<String, Any?>> ?: emptyList()
75+
76+
rollupsList.forEach { rollupMap ->
77+
val rollupObj = rollupMap["rollup"] as? Map<String, Any?> ?: return@forEach
78+
val id = rollupMap["_id"] as? String ?: return@forEach
79+
val enabled = rollupObj["enabled"] as? Boolean ?: false
80+
81+
if (enabled) {
82+
try {
83+
client().makeRequest("POST", "$ROLLUP_JOBS_BASE_URI/$id/_stop")
84+
logger.debug("Stopped rollup job during test cleanup: $id")
85+
} catch (e: Exception) {
86+
logger.debug("Failed to stop rollup $id during cleanup: ${e.message}")
8287
}
8388
}
8489
}
85-
assertFalse(hasCancallableRunningTasks)
90+
} catch (e: Exception) {
91+
logger.warn("Error stopping rollup jobs during test cleanup", e)
8692
}
8793
}
8894

src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RollupRestAPITestCase.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,14 @@ import org.opensearch.indexmanagement.rollup.RollupRestTestCase
1111
abstract class RollupRestAPITestCase : RollupRestTestCase() {
1212
@After
1313
fun clearIndicesAfterEachTest() {
14+
// Stop all rollup jobs first to prevent new executions and allow running coroutines to complete
15+
stopAllRollupJobs()
16+
17+
// Wait for in-flight job executions to complete their metadata writes
18+
// This prevents race condition where coroutines recreate config index after wipeAllIndices()
19+
Thread.sleep(2000)
20+
21+
1422
// For API tests, flaky could happen if config index not deleted
1523
// metadata creation could cause the mapping to be auto set to
1624
// a wrong type, namely, [rollup_metadata.continuous.next_window_end_time] to long

0 commit comments

Comments
 (0)