Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ class RemoteClusterRestoreLeaderService @Inject constructor(private val indicesS
var fromSeqNo = RetentionLeaseActions.RETAIN_ALL

// Adds the retention lease for fromSeqNo for the next stage of the replication.
retentionLeaseHelper.addRetentionLease(request.leaderShardId, fromSeqNo,
request.followerShardId, RemoteClusterRepository.REMOTE_CLUSTER_REPO_REQ_TIMEOUT_IN_MILLI_SEC)
retentionLeaseHelper.addRetentionLease(request.leaderShardId, fromSeqNo, request.followerShardId,
RemoteClusterRepository.REMOTE_CLUSTER_REPO_REQ_TIMEOUT_IN_MILLI_SEC)

/**
* At this point, it should be safe to release retention lock as the retention lease
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.opensearch.index.seqno.RetentionLeaseActions
import org.opensearch.index.seqno.RetentionLeaseAlreadyExistsException
import org.opensearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException
import org.opensearch.index.seqno.RetentionLeaseNotFoundException
import org.opensearch.index.shard.IndexShard
import org.opensearch.index.shard.ShardId
import org.opensearch.replication.metadata.store.ReplicationMetadata
import org.opensearch.replication.repository.RemoteClusterRepository
Expand Down Expand Up @@ -175,22 +176,47 @@ class RemoteClusterRetentionLeaseHelper constructor(var followerClusterNameWithU
}
}

public fun attemptRetentionLeaseRemoval(leaderShardId: ShardId, followerShardId: ShardId, timeout: Long) {
val retentionLeaseId = retentionLeaseIdForShard(followerClusterNameWithUUID, followerShardId)
val request = RetentionLeaseActions.RemoveRequest(leaderShardId, retentionLeaseId)
try {
client.execute(RetentionLeaseActions.Remove.INSTANCE, request).actionGet(timeout)
log.info("Removed retention lease with id - $retentionLeaseId")
} catch(e: RetentionLeaseNotFoundException) {
// log error and bail
log.error(e.stackTraceToString())
} catch (e: Exception) {
// We are not bubbling up the exception as the stop action/ task cleanup should succeed
// even if we fail to remove the retention lease from leader cluster
log.error("Exception in removing retention lease", e)
}
}


/**
* Remove these once the callers are moved to above APIs
*/
public fun addRetentionLease(leaderShardId: ShardId, seqNo: Long,
followerShardId: ShardId, timeout: Long) {
followerShardId: ShardId, timeout: Long) {
val retentionLeaseId = retentionLeaseIdForShard(followerClusterNameWithUUID, followerShardId)
val request = RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, seqNo, retentionLeaseSource)
try {
client.execute(RetentionLeaseActions.Add.INSTANCE, request).actionGet(timeout)
} catch (e: RetentionLeaseAlreadyExistsException) {
log.error(e.stackTraceToString())
log.info("Renew retention lease as it already exists $retentionLeaseId with $seqNo")
// Only one retention lease should exists for the follower shard
// Ideally, this should have got cleaned-up
renewRetentionLease(leaderShardId, seqNo, followerShardId, timeout)
var canRetry = true
while (true) {
try {
log.info("Adding retention lease $retentionLeaseId")
client.execute(RetentionLeaseActions.Add.INSTANCE, request).actionGet(timeout)
break
} catch (e: RetentionLeaseAlreadyExistsException) {
log.info("Found a stale retention lease $retentionLeaseId on leader.")
if (canRetry) {
canRetry = false
attemptRetentionLeaseRemoval(leaderShardId, followerShardId, timeout)
log.info("Cleared stale retention lease $retentionLeaseId on leader. Retrying...")
} else {
log.error(e.stackTraceToString())
throw e
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
private suspend fun pollShardTaskStatus(): IndexReplicationState {
val failedShardTasks = findAllReplicationFailedShardTasks(followerIndexName, clusterService.state())
if (failedShardTasks.isNotEmpty()) {
log.info("Failed shard tasks - ", failedShardTasks)
log.info("Failed shard tasks - $failedShardTasks")
var msg = ""
for ((shard, task) in failedShardTasks) {
val taskState = task.state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
TaskId(clusterService.nodeName, id), client, indexShard.localCheckpoint, followerClusterStats)

val changeTracker = ShardReplicationChangesTracker(indexShard, replicationSettings)
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
// In case the shard task starts on a new node and there are no active writes on the leader shard, leader checkpoint
// never gets initialized and defaults to 0. To get around this, we set the leaderCheckpoint to follower shard's
// localCheckpoint as the leader shard is guaranteed to equal or more.
followerClusterStats.stats[followerShardId]!!.leaderCheckpoint = indexShard.localCheckpoint
coroutineScope {
while (isActive) {
rateLimiter.acquire()
Expand Down Expand Up @@ -273,7 +278,6 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
//hence renew retention lease with lastSyncedGlobalCheckpoint + 1 so that any shard that picks up shard replication task has data until then.
try {
retentionLeaseHelper.renewRetentionLease(leaderShardId, indexShard.lastSyncedGlobalCheckpoint + 1, followerShardId)
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.lastSyncedGlobalCheckpoint
lastLeaseRenewalMillis = System.currentTimeMillis()
} catch (ex: Exception) {
when (ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.client.Client
import org.opensearch.common.logging.Loggers
import org.opensearch.index.shard.ShardId
import org.opensearch.index.translog.Translog
import org.opensearch.replication.util.indicesService
import org.opensearch.tasks.TaskId
import java.util.ArrayList
import java.util.concurrent.ConcurrentHashMap
Expand Down Expand Up @@ -55,6 +56,9 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
private val log = Loggers.getLogger(javaClass, followerShardId)!!
private val completed = CompletableDeferred<Unit>()

val followerIndexService = indicesService.indexServiceSafe(followerShardId.index)
val indexShard = followerIndexService.getShard(followerShardId.id)

private val sequencer = scope.actor<Unit>(capacity = Channel.UNLIMITED) {
// Exceptions thrown here will mark the channel as failed and the next attempt to send to the channel will
// raise the same exception. See [SendChannel.close] method for details.
Expand Down Expand Up @@ -88,6 +92,7 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
val tookInNanos = System.nanoTime() - relativeStartNanos
followerClusterStats.stats[followerShardId]!!.totalWriteTime.addAndGet(TimeUnit.NANOSECONDS.toMillis(tookInNanos))
followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet(replayRequest.changes.size.toLong())
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
}
highWatermark = next.changes.lastOrNull()?.seqNo() ?: highWatermark
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package org.opensearch.replication

import com.nhaarman.mockitokotlin2.stub
import org.opensearch.replication.MultiClusterAnnotations.ClusterConfiguration
import org.opensearch.replication.MultiClusterAnnotations.ClusterConfigurations
import org.opensearch.replication.MultiClusterAnnotations.getAnnotationsFromClass
Expand All @@ -21,6 +20,7 @@ import org.apache.http.HttpHost
import org.apache.http.HttpStatus
import org.apache.http.client.config.RequestConfig
import org.apache.http.entity.ContentType
import org.apache.http.entity.StringEntity
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.apache.http.message.BasicHeader
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy
Expand Down Expand Up @@ -513,6 +513,28 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
return OpenSearchRestTestCase.entityAsList(client.performRequest(Request("GET", endpoint)))
}

protected fun deleteConnection(fromClusterName: String, connectionName: String="source") {
val fromCluster = getNamedCluster(fromClusterName)
val persistentConnectionRequest = Request("PUT", "_cluster/settings")

val entityAsString = """
{
"persistent": {
"cluster": {
"remote": {
"$connectionName": {
"seeds": null
}
}
}
}
}""".trimMargin()

persistentConnectionRequest.entity = StringEntity(entityAsString, ContentType.APPLICATION_JSON)
val persistentConnectionResponse = fromCluster.lowLevelClient.performRequest(persistentConnectionRequest)
assertEquals(HttpStatus.SC_OK.toLong(), persistentConnectionResponse.statusLine.statusCode.toLong())
}

protected fun createConnectionBetweenClusters(fromClusterName: String, toClusterName: String, connectionName: String="source") {
val toCluster = getNamedCluster(toClusterName)
val fromCluster = getNamedCluster(fromClusterName)
Expand Down Expand Up @@ -635,4 +657,16 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
updateSettingsRequest.transientSettings(Collections.singletonMap<String, String?>(ReplicationPlugin.REPLICATION_METADATA_SYNC_INTERVAL.key, "5s"))
followerClient.cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT)
}

protected fun docCount(cluster: RestHighLevelClient, indexName: String) : Int {
val persistentConnectionRequest = Request("GET", "/$indexName/_search?pretty&q=*")

val persistentConnectionResponse = cluster.lowLevelClient.performRequest(persistentConnectionRequest)
val statusResponse: Map<String, Map<String, Map<String, Any>>> = OpenSearchRestTestCase.entityAsMap(persistentConnectionResponse) as Map<String, Map<String, Map<String, String>>>
return statusResponse["hits"]?.get("total")?.get("value") as Int
}

protected fun deleteIndex(testCluster: RestHighLevelClient, indexName: String) {
testCluster.lowLevelClient.performRequest(Request("DELETE", indexName))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.opensearch.replication.integ.rest

import org.opensearch.replication.MultiClusterRestTestCase
import org.opensearch.replication.MultiClusterAnnotations
import org.opensearch.replication.StartReplicationRequest
import org.opensearch.replication.startReplication
import org.opensearch.replication.stopReplication
import org.assertj.core.api.Assertions
import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.CreateIndexRequest
import org.junit.Assert
import java.util.concurrent.TimeUnit


@MultiClusterAnnotations.ClusterConfigurations(
MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER),
MultiClusterAnnotations.ClusterConfiguration(clusterName = FOLLOWER)
)

class ReplicationStopThenRestartIT : MultiClusterRestTestCase() {
private val leaderIndexName = "leader_index"
private val followerIndexName = "follower_index"

fun `test replication works after unclean stop and start`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
changeTemplate(LEADER)
createConnectionBetweenClusters(FOLLOWER, LEADER)
val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue()
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName))
insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName)
insertDocToIndex(LEADER, "2", "dummy data 1",leaderIndexName)

assertBusy ({
try {
Assert.assertEquals(2, docCount(followerClient, followerIndexName))
} catch (ex: Exception) {
ex.printStackTrace();
Assert.fail("Exception while querying follower cluster. Failing to retry again {}")
}
}, 1, TimeUnit.MINUTES)


deleteConnection(FOLLOWER)
followerClient.stopReplication(followerIndexName, shouldWait = true)
deleteIndex(followerClient, followerIndexName)

createConnectionBetweenClusters(FOLLOWER, LEADER)
insertDocToIndex(LEADER, "3", "dummy data 1",leaderIndexName)
insertDocToIndex(LEADER, "4", "dummy data 1",leaderIndexName)
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName))

assertBusy ({
try {
Assert.assertEquals(4, docCount(followerClient, followerIndexName))
} catch (ex: Exception) {
Assert.fail("Exception while querying follower cluster. Failing to retry again")
}
}, 1, TimeUnit.MINUTES)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,37 @@

package org.opensearch.replication.task.shard

import org.opensearch.replication.action.changes.GetChangesResponse
import org.opensearch.replication.action.replay.ReplayChangesAction
import org.opensearch.replication.action.replay.ReplayChangesRequest
import org.opensearch.replication.action.replay.ReplayChangesResponse
import org.opensearch.replication.metadata.ReplicationOverallState
import org.opensearch.replication.metadata.store.ReplicationContext
import org.opensearch.replication.metadata.store.ReplicationMetadata
import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.ObsoleteCoroutinesApi
import kotlinx.coroutines.test.runBlockingTest
import org.assertj.core.api.Assertions.assertThat
import org.mockito.Mockito
import org.opensearch.action.ActionListener
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionResponse
import org.opensearch.action.ActionType
import org.opensearch.action.support.replication.ReplicationResponse.ShardInfo
import org.opensearch.common.settings.Settings
import org.opensearch.index.IndexService
import org.opensearch.index.shard.IndexShard
import org.opensearch.index.shard.ShardId
import org.opensearch.index.translog.Translog
import org.opensearch.indices.IndicesService
import org.opensearch.replication.action.changes.GetChangesResponse
import org.opensearch.replication.action.replay.ReplayChangesAction
import org.opensearch.replication.action.replay.ReplayChangesRequest
import org.opensearch.replication.action.replay.ReplayChangesResponse
import org.opensearch.replication.metadata.ReplicationOverallState
import org.opensearch.replication.metadata.store.ReplicationContext
import org.opensearch.replication.metadata.store.ReplicationMetadata
import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType
import org.opensearch.replication.util.indicesService
import org.opensearch.tasks.TaskId.EMPTY_TASK_ID
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.test.OpenSearchTestCase.randomList
import org.opensearch.test.client.NoOpClient
import java.util.Locale


@ObsoleteCoroutinesApi
class TranslogSequencerTests : OpenSearchTestCase() {

Expand Down Expand Up @@ -83,6 +88,11 @@ class TranslogSequencerTests : OpenSearchTestCase() {
val stats = FollowerClusterStats()
stats.stats[followerShardId] = FollowerShardMetric()
val startSeqNo = randomNonNegativeLong()
indicesService = Mockito.mock(IndicesService::class.java)
val followerIndexService = Mockito.mock(IndexService::class.java)
val indexShard = Mockito.mock(IndexShard::class.java)
Mockito.`when`(indicesService.indexServiceSafe(followerShardId.index)).thenReturn(followerIndexService)
Mockito.`when`(followerIndexService.getShard(followerShardId.id)).thenReturn(indexShard)
val sequencer = TranslogSequencer(this, replicationMetadata, followerShardId, leaderAlias, leaderIndex, EMPTY_TASK_ID,
client, startSeqNo, stats)

Expand Down