Skip to content

Commit dd2b68d

Browse files
committed
Initialize the leaderCheckpoint with follower shard's localCheckpoint (#904)
Signed-off-by: Ankit Kala <ankikala@amazon.com>
1 parent bff3f23 commit dd2b68d

File tree

6 files changed

+139
-13
lines changed

6 files changed

+139
-13
lines changed

src/main/kotlin/org/opensearch/replication/repository/RemoteClusterRestoreLeaderService.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,8 @@ class RemoteClusterRestoreLeaderService @Inject constructor(private val indicesS
114114
var fromSeqNo = RetentionLeaseActions.RETAIN_ALL
115115

116116
// Adds the retention lease for fromSeqNo for the next stage of the replication.
117-
retentionLeaseHelper.addRetentionLease(request.leaderShardId, fromSeqNo,
118-
request.followerShardId, RemoteClusterRepository.REMOTE_CLUSTER_REPO_REQ_TIMEOUT_IN_MILLI_SEC)
117+
retentionLeaseHelper.addRetentionLease(request.leaderShardId, fromSeqNo, request.followerShardId,
118+
RemoteClusterRepository.REMOTE_CLUSTER_REPO_REQ_TIMEOUT_IN_MILLI_SEC)
119119

120120
/**
121121
* At this point, it should be safe to release retention lock as the retention lease

src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.opensearch.index.seqno.RetentionLeaseActions
2222
import org.opensearch.index.seqno.RetentionLeaseAlreadyExistsException
2323
import org.opensearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException
2424
import org.opensearch.index.seqno.RetentionLeaseNotFoundException
25+
import org.opensearch.index.shard.IndexShard
2526
import org.opensearch.index.shard.ShardId
2627
import org.opensearch.replication.metadata.store.ReplicationMetadata
2728
import org.opensearch.replication.repository.RemoteClusterRepository
@@ -175,22 +176,47 @@ class RemoteClusterRetentionLeaseHelper constructor(var followerClusterNameWithU
175176
}
176177
}
177178

179+
public fun attemptRetentionLeaseRemoval(leaderShardId: ShardId, followerShardId: ShardId, timeout: Long) {
180+
val retentionLeaseId = retentionLeaseIdForShard(followerClusterNameWithUUID, followerShardId)
181+
val request = RetentionLeaseActions.RemoveRequest(leaderShardId, retentionLeaseId)
182+
try {
183+
client.execute(RetentionLeaseActions.Remove.INSTANCE, request).actionGet(timeout)
184+
log.info("Removed retention lease with id - $retentionLeaseId")
185+
} catch(e: RetentionLeaseNotFoundException) {
186+
// log error and bail
187+
log.error(e.stackTraceToString())
188+
} catch (e: Exception) {
189+
// We are not bubbling up the exception as the stop action/ task cleanup should succeed
190+
// even if we fail to remove the retention lease from leader cluster
191+
log.error("Exception in removing retention lease", e)
192+
}
193+
}
194+
178195

179196
/**
180197
* Remove these once the callers are moved to above APIs
181198
*/
182199
public fun addRetentionLease(leaderShardId: ShardId, seqNo: Long,
183-
followerShardId: ShardId, timeout: Long) {
200+
followerShardId: ShardId, timeout: Long) {
184201
val retentionLeaseId = retentionLeaseIdForShard(followerClusterNameWithUUID, followerShardId)
185202
val request = RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, seqNo, retentionLeaseSource)
186-
try {
187-
client.execute(RetentionLeaseActions.Add.INSTANCE, request).actionGet(timeout)
188-
} catch (e: RetentionLeaseAlreadyExistsException) {
189-
log.error(e.stackTraceToString())
190-
log.info("Renew retention lease as it already exists $retentionLeaseId with $seqNo")
191-
// Only one retention lease should exists for the follower shard
192-
// Ideally, this should have got cleaned-up
193-
renewRetentionLease(leaderShardId, seqNo, followerShardId, timeout)
203+
var canRetry = true
204+
while (true) {
205+
try {
206+
log.info("Adding retention lease $retentionLeaseId")
207+
client.execute(RetentionLeaseActions.Add.INSTANCE, request).actionGet(timeout)
208+
break
209+
} catch (e: RetentionLeaseAlreadyExistsException) {
210+
log.info("Found a stale retention lease $retentionLeaseId on leader.")
211+
if (canRetry) {
212+
canRetry = false
213+
attemptRetentionLeaseRemoval(leaderShardId, followerShardId, timeout)
214+
log.info("Cleared stale retention lease $retentionLeaseId on leader. Retrying...")
215+
} else {
216+
log.error(e.stackTraceToString())
217+
throw e
218+
}
219+
}
194220
}
195221
}
196222

src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
290290
private suspend fun pollShardTaskStatus(): IndexReplicationState {
291291
val failedShardTasks = findAllReplicationFailedShardTasks(followerIndexName, clusterService.state())
292292
if (failedShardTasks.isNotEmpty()) {
293-
log.info("Failed shard tasks - ", failedShardTasks)
293+
log.info("Failed shard tasks - $failedShardTasks")
294294
var msg = ""
295295
for ((shard, task) in failedShardTasks) {
296296
val taskState = task.state

src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,10 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
218218

219219
val changeTracker = ShardReplicationChangesTracker(indexShard, replicationSettings)
220220
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
221+
// In case the shard task starts on a new node and there are no active writes on the leader shard, leader checkpoint
222+
// never gets initialized and defaults to 0. To get around this, we set the leaderCheckpoint to follower shard's
223+
// localCheckpoint as the leader shard is guaranteed to equal or more.
224+
followerClusterStats.stats[followerShardId]!!.leaderCheckpoint = indexShard.localCheckpoint
221225
coroutineScope {
222226
while (isActive) {
223227
rateLimiter.acquire()

src/test/kotlin/org/opensearch/replication/MultiClusterRestTestCase.kt

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
package org.opensearch.replication
1313

14-
import com.nhaarman.mockitokotlin2.stub
1514
import org.opensearch.replication.MultiClusterAnnotations.ClusterConfiguration
1615
import org.opensearch.replication.MultiClusterAnnotations.ClusterConfigurations
1716
import org.opensearch.replication.MultiClusterAnnotations.getAnnotationsFromClass
@@ -21,6 +20,7 @@ import org.apache.http.HttpHost
2120
import org.apache.http.HttpStatus
2221
import org.apache.http.client.config.RequestConfig
2322
import org.apache.http.entity.ContentType
23+
import org.apache.http.entity.StringEntity
2424
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
2525
import org.apache.http.message.BasicHeader
2626
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy
@@ -513,6 +513,28 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
513513
return OpenSearchRestTestCase.entityAsList(client.performRequest(Request("GET", endpoint)))
514514
}
515515

516+
protected fun deleteConnection(fromClusterName: String, connectionName: String="source") {
517+
val fromCluster = getNamedCluster(fromClusterName)
518+
val persistentConnectionRequest = Request("PUT", "_cluster/settings")
519+
520+
val entityAsString = """
521+
{
522+
"persistent": {
523+
"cluster": {
524+
"remote": {
525+
"$connectionName": {
526+
"seeds": null
527+
}
528+
}
529+
}
530+
}
531+
}""".trimMargin()
532+
533+
persistentConnectionRequest.entity = StringEntity(entityAsString, ContentType.APPLICATION_JSON)
534+
val persistentConnectionResponse = fromCluster.lowLevelClient.performRequest(persistentConnectionRequest)
535+
assertEquals(HttpStatus.SC_OK.toLong(), persistentConnectionResponse.statusLine.statusCode.toLong())
536+
}
537+
516538
protected fun createConnectionBetweenClusters(fromClusterName: String, toClusterName: String, connectionName: String="source") {
517539
val toCluster = getNamedCluster(toClusterName)
518540
val fromCluster = getNamedCluster(fromClusterName)
@@ -635,4 +657,16 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
635657
updateSettingsRequest.transientSettings(Collections.singletonMap<String, String?>(ReplicationPlugin.REPLICATION_METADATA_SYNC_INTERVAL.key, "5s"))
636658
followerClient.cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT)
637659
}
660+
661+
protected fun docCount(cluster: RestHighLevelClient, indexName: String) : Int {
662+
val persistentConnectionRequest = Request("GET", "/$indexName/_search?pretty&q=*")
663+
664+
val persistentConnectionResponse = cluster.lowLevelClient.performRequest(persistentConnectionRequest)
665+
val statusResponse: Map<String, Map<String, Map<String, Any>>> = OpenSearchRestTestCase.entityAsMap(persistentConnectionResponse) as Map<String, Map<String, Map<String, String>>>
666+
return statusResponse["hits"]?.get("total")?.get("value") as Int
667+
}
668+
669+
protected fun deleteIndex(testCluster: RestHighLevelClient, indexName: String) {
670+
testCluster.lowLevelClient.performRequest(Request("DELETE", indexName))
671+
}
638672
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package org.opensearch.replication.integ.rest
2+
3+
import org.opensearch.replication.MultiClusterRestTestCase
4+
import org.opensearch.replication.MultiClusterAnnotations
5+
import org.opensearch.replication.StartReplicationRequest
6+
import org.opensearch.replication.startReplication
7+
import org.opensearch.replication.stopReplication
8+
import org.assertj.core.api.Assertions
9+
import org.opensearch.client.RequestOptions
10+
import org.opensearch.client.indices.CreateIndexRequest
11+
import org.junit.Assert
12+
import java.util.concurrent.TimeUnit
13+
14+
15+
@MultiClusterAnnotations.ClusterConfigurations(
16+
MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER),
17+
MultiClusterAnnotations.ClusterConfiguration(clusterName = FOLLOWER)
18+
)
19+
20+
class ReplicationStopThenRestartIT : MultiClusterRestTestCase() {
21+
private val leaderIndexName = "leader_index"
22+
private val followerIndexName = "follower_index"
23+
24+
fun `test replication works after unclean stop and start`() {
25+
val followerClient = getClientForCluster(FOLLOWER)
26+
val leaderClient = getClientForCluster(LEADER)
27+
changeTemplate(LEADER)
28+
createConnectionBetweenClusters(FOLLOWER, LEADER)
29+
val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
30+
Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue()
31+
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName))
32+
insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName)
33+
insertDocToIndex(LEADER, "2", "dummy data 1",leaderIndexName)
34+
35+
assertBusy ({
36+
try {
37+
Assert.assertEquals(2, docCount(followerClient, followerIndexName))
38+
} catch (ex: Exception) {
39+
ex.printStackTrace();
40+
Assert.fail("Exception while querying follower cluster. Failing to retry again {}")
41+
}
42+
}, 1, TimeUnit.MINUTES)
43+
44+
45+
deleteConnection(FOLLOWER)
46+
followerClient.stopReplication(followerIndexName, shouldWait = true)
47+
deleteIndex(followerClient, followerIndexName)
48+
49+
createConnectionBetweenClusters(FOLLOWER, LEADER)
50+
insertDocToIndex(LEADER, "3", "dummy data 1",leaderIndexName)
51+
insertDocToIndex(LEADER, "4", "dummy data 1",leaderIndexName)
52+
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName))
53+
54+
assertBusy ({
55+
try {
56+
Assert.assertEquals(4, docCount(followerClient, followerIndexName))
57+
} catch (ex: Exception) {
58+
Assert.fail("Exception while querying follower cluster. Failing to retry again")
59+
}
60+
}, 1, TimeUnit.MINUTES)
61+
}
62+
}

0 commit comments

Comments
 (0)