Skip to content

Commit c71483d

Browse files
Modify autofollow retry scheduler logic check to account for completed runs (#839) (#897) (#960)
(cherry picked from commit 689ae33) Signed-off-by: Sai Kumar <karanas@amazon.com> Co-authored-by: Sai Kumar <karanas@amazon.com>
1 parent f8173f8 commit c71483d

File tree

2 files changed

+38
-24
lines changed

2 files changed

+38
-24
lines changed

src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import org.opensearch.tasks.TaskId
4545
import org.opensearch.threadpool.Scheduler
4646
import org.opensearch.threadpool.ThreadPool
4747
import java.util.concurrent.ConcurrentSkipListSet
48+
import java.util.concurrent.TimeUnit
4849

4950
class AutoFollowTask(id: Long, type: String, action: String, description: String, parentTask: TaskId,
5051
headers: Map<String, String>,
@@ -91,7 +92,7 @@ class AutoFollowTask(id: Long, type: String, action: String, description: String
9192

9293
private fun addRetryScheduler() {
9394
log.debug("Adding retry scheduler")
94-
if(retryScheduler != null && !retryScheduler!!.isCancelled) {
95+
if(retryScheduler != null && retryScheduler!!.getDelay(TimeUnit.NANOSECONDS) > 0L) {
9596
return
9697
}
9798
try {

src/test/kotlin/org/opensearch/replication/integ/rest/UpdateAutoFollowPatternIT.kt

Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.opensearch.replication.task.index.IndexReplicationExecutor
2323
import org.apache.http.HttpStatus
2424
import org.apache.http.entity.ContentType
2525
import org.apache.http.nio.entity.NStringEntity
26+
import org.apache.logging.log4j.LogManager
2627
import org.assertj.core.api.Assertions
2728
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest
2829
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest
@@ -41,6 +42,7 @@ import org.opensearch.cluster.metadata.IndexMetadata
4142
import org.opensearch.cluster.metadata.MetadataCreateIndexService
4243
import org.opensearch.replication.AutoFollowStats
4344
import org.opensearch.replication.ReplicationPlugin
45+
import org.opensearch.replication.action.changes.TransportGetChangesAction
4446
import org.opensearch.replication.updateReplicationStartBlockSetting
4547
import org.opensearch.replication.updateAutofollowRetrySetting
4648
import org.opensearch.replication.updateAutoFollowConcurrentStartReplicationJobSetting
@@ -63,6 +65,10 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() {
6365
private val longIndexPatternName = "index_".repeat(43)
6466
private val waitForShardTask = TimeValue.timeValueSeconds(10)
6567

68+
companion object {
69+
private val log = LogManager.getLogger(UpdateAutoFollowPatternIT::class.java)
70+
}
71+
6672
fun `test auto follow pattern`() {
6773
val followerClient = getClientForCluster(FOLLOWER)
6874
val leaderClient = getClientForCluster(LEADER)
@@ -316,36 +322,43 @@ class UpdateAutoFollowPatternIT: MultiClusterRestTestCase() {
316322
Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(1)
317323
}
318324

319-
fun `test autofollow task with start replication block`() {
325+
fun `test autofollow task with start replication block and retries`() {
320326
val followerClient = getClientForCluster(FOLLOWER)
321327
val leaderClient = getClientForCluster(LEADER)
322328
createConnectionBetweenClusters(FOLLOWER, LEADER, connectionAlias)
323-
val leaderIndexName = createRandomIndex(leaderClient)
324329
try {
325330
//modify retry duration to account for autofollow trigger in next retry
326331
followerClient.updateAutofollowRetrySetting("1m")
327-
// Add replication start block
328-
followerClient.updateReplicationStartBlockSetting(true)
329-
followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern)
330-
sleep(30000) // Default poll for auto follow in worst case
331-
// verify both index replication tasks and autofollow tasks
332-
// Replication shouldn't have been started - 0 tasks
333-
// Autofollow task should still be up - 1 task
334-
Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(0)
335-
Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1)
332+
for (repeat in 1..2) {
333+
log.info("Current Iteration $repeat")
334+
// Add replication start block
335+
followerClient.updateReplicationStartBlockSetting(true)
336+
createRandomIndex(leaderClient)
337+
followerClient.updateAutoFollowPattern(connectionAlias, indexPatternName, indexPattern)
338+
sleep(95000) // wait for auto follow trigger in the worst case
339+
// verify both index replication tasks and autofollow tasks
340+
// Replication shouldn't have been started - (repeat-1) tasks as for current loop index shouldn't be
341+
// created yet.
342+
// Autofollow task should still be up - 1 task
343+
Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(repeat-1)
344+
Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1)
336345

337-
var stats = followerClient.AutoFollowStats()
338-
var failedIndices = stats["failed_indices"] as List<*>
339-
assert(failedIndices.size == 1)
340-
// Remove replication start block
341-
followerClient.updateReplicationStartBlockSetting(false)
342-
sleep(60000) // wait for auto follow trigger in the worst case
343-
// Index should be replicated and autofollow task should be present
344-
Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(1)
345-
Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1)
346-
stats = followerClient.AutoFollowStats()
347-
failedIndices = stats["failed_indices"] as List<*>
348-
assert(failedIndices.isEmpty())
346+
var stats = followerClient.AutoFollowStats()
347+
var failedIndices = stats["failed_indices"] as List<*>
348+
// Every time failed replication task will be 1 as
349+
// there are already running jobs in the previous iteration
350+
log.info("Current failed indices $failedIndices")
351+
assert(failedIndices.size == 1)
352+
// Remove replication start block
353+
followerClient.updateReplicationStartBlockSetting(false)
354+
sleep(95000) // wait for auto follow trigger in the worst case
355+
// Index should be replicated and autofollow task should be present
356+
Assertions.assertThat(getIndexReplicationTasks(FOLLOWER).size).isEqualTo(repeat)
357+
Assertions.assertThat(getAutoFollowTasks(FOLLOWER).size).isEqualTo(1)
358+
stats = followerClient.AutoFollowStats()
359+
failedIndices = stats["failed_indices"] as List<*>
360+
assert(failedIndices.isEmpty())
361+
}
349362
} finally {
350363
followerClient.deleteAutoFollowPattern(connectionAlias, indexPatternName)
351364
}

0 commit comments

Comments
 (0)