Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
TaskId(clusterService.nodeName, id), client, indexShard.localCheckpoint, followerClusterStats)

val changeTracker = ShardReplicationChangesTracker(indexShard, replicationSettings)
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
coroutineScope {
while (isActive) {
rateLimiter.acquire()
Expand Down Expand Up @@ -273,7 +274,6 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
//hence renew retention lease with lastSyncedGlobalCheckpoint + 1 so that any shard that picks up shard replication task has data until then.
try {
retentionLeaseHelper.renewRetentionLease(leaderShardId, indexShard.lastSyncedGlobalCheckpoint + 1, followerShardId)
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.lastSyncedGlobalCheckpoint
lastLeaseRenewalMillis = System.currentTimeMillis()
} catch (ex: Exception) {
when (ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.client.Client
import org.opensearch.common.logging.Loggers
import org.opensearch.index.shard.ShardId
import org.opensearch.index.translog.Translog
import org.opensearch.replication.util.indicesService
import org.opensearch.tasks.TaskId
import java.util.ArrayList
import java.util.concurrent.ConcurrentHashMap
Expand Down Expand Up @@ -55,6 +56,9 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
private val log = Loggers.getLogger(javaClass, followerShardId)!!
private val completed = CompletableDeferred<Unit>()

val followerIndexService = indicesService.indexServiceSafe(followerShardId.index)
val indexShard = followerIndexService.getShard(followerShardId.id)

private val sequencer = scope.actor<Unit>(capacity = Channel.UNLIMITED) {
// Exceptions thrown here will mark the channel as failed and the next attempt to send to the channel will
// raise the same exception. See [SendChannel.close] method for details.
Expand Down Expand Up @@ -88,6 +92,7 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
val tookInNanos = System.nanoTime() - relativeStartNanos
followerClusterStats.stats[followerShardId]!!.totalWriteTime.addAndGet(TimeUnit.NANOSECONDS.toMillis(tookInNanos))
followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet(replayRequest.changes.size.toLong())
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
}
highWatermark = next.changes.lastOrNull()?.seqNo() ?: highWatermark
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,7 @@ class StartReplicationIT: MultiClusterRestTestCase() {
assertThat(stats.getValue("operations_read").toString()).isEqualTo("50")
assertThat(stats.getValue("failed_read_requests").toString()).isEqualTo("0")
assertThat(stats.getValue("failed_write_requests").toString()).isEqualTo("0")
assertThat(stats.getValue("follower_checkpoint").toString()).isEqualTo((docCount-1).toString())
assertThat(stats.containsKey("index_stats"))
assertThat(stats.size).isEqualTo(16)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,37 @@

package org.opensearch.replication.task.shard

import org.opensearch.replication.action.changes.GetChangesResponse
import org.opensearch.replication.action.replay.ReplayChangesAction
import org.opensearch.replication.action.replay.ReplayChangesRequest
import org.opensearch.replication.action.replay.ReplayChangesResponse
import org.opensearch.replication.metadata.ReplicationOverallState
import org.opensearch.replication.metadata.store.ReplicationContext
import org.opensearch.replication.metadata.store.ReplicationMetadata
import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.ObsoleteCoroutinesApi
import kotlinx.coroutines.test.runBlockingTest
import org.assertj.core.api.Assertions.assertThat
import org.mockito.Mockito
import org.opensearch.action.ActionListener
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionResponse
import org.opensearch.action.ActionType
import org.opensearch.action.support.replication.ReplicationResponse.ShardInfo
import org.opensearch.common.settings.Settings
import org.opensearch.index.IndexService
import org.opensearch.index.shard.IndexShard
import org.opensearch.index.shard.ShardId
import org.opensearch.index.translog.Translog
import org.opensearch.indices.IndicesService
import org.opensearch.replication.action.changes.GetChangesResponse
import org.opensearch.replication.action.replay.ReplayChangesAction
import org.opensearch.replication.action.replay.ReplayChangesRequest
import org.opensearch.replication.action.replay.ReplayChangesResponse
import org.opensearch.replication.metadata.ReplicationOverallState
import org.opensearch.replication.metadata.store.ReplicationContext
import org.opensearch.replication.metadata.store.ReplicationMetadata
import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType
import org.opensearch.replication.util.indicesService
import org.opensearch.tasks.TaskId.EMPTY_TASK_ID
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.test.OpenSearchTestCase.randomList
import org.opensearch.test.client.NoOpClient
import java.util.Locale


@ObsoleteCoroutinesApi
class TranslogSequencerTests : OpenSearchTestCase() {

Expand Down Expand Up @@ -83,6 +88,11 @@ class TranslogSequencerTests : OpenSearchTestCase() {
val stats = FollowerClusterStats()
stats.stats[followerShardId] = FollowerShardMetric()
val startSeqNo = randomNonNegativeLong()
indicesService = Mockito.mock(IndicesService::class.java)
val followerIndexService = Mockito.mock(IndexService::class.java)
val indexShard = Mockito.mock(IndexShard::class.java)
Mockito.`when`(indicesService.indexServiceSafe(followerShardId.index)).thenReturn(followerIndexService)
Mockito.`when`(followerIndexService.getShard(followerShardId.id)).thenReturn(indexShard)
val sequencer = TranslogSequencer(this, replicationMetadata, followerShardId, leaderAlias, leaderIndex, EMPTY_TASK_ID,
client, startSeqNo, stats)

Expand Down