Skip to content

Commit 0f36d51

Browse files
committed
Correctly updating the followerCheckpoint in stats api
Summary : We need to update followerCheckpoint after writing to the follower index. Currently, we are not waiting for the writes and updating it with soon-to-be stale values Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
1 parent ec23d1f commit 0f36d51

File tree

4 files changed

+26
-10
lines changed

4 files changed

+26
-10
lines changed

src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
217217
TaskId(clusterService.nodeName, id), client, indexShard.localCheckpoint, followerClusterStats)
218218

219219
val changeTracker = ShardReplicationChangesTracker(indexShard, replicationSettings)
220+
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
220221
coroutineScope {
221222
while (isActive) {
222223
rateLimiter.acquire()
@@ -273,7 +274,6 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
273274
//hence renew retention lease with lastSyncedGlobalCheckpoint + 1 so that any shard that picks up shard replication task has data until then.
274275
try {
275276
retentionLeaseHelper.renewRetentionLease(leaderShardId, indexShard.lastSyncedGlobalCheckpoint + 1, followerShardId)
276-
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.lastSyncedGlobalCheckpoint
277277
lastLeaseRenewalMillis = System.currentTimeMillis()
278278
} catch (ex: Exception) {
279279
when (ex) {

src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.opensearch.client.Client
2828
import org.opensearch.common.logging.Loggers
2929
import org.opensearch.index.shard.ShardId
3030
import org.opensearch.index.translog.Translog
31+
import org.opensearch.replication.util.indicesService
3132
import org.opensearch.tasks.TaskId
3233
import java.util.ArrayList
3334
import java.util.concurrent.ConcurrentHashMap
@@ -55,6 +56,9 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
5556
private val log = Loggers.getLogger(javaClass, followerShardId)!!
5657
private val completed = CompletableDeferred<Unit>()
5758

59+
val followerIndexService = indicesService.indexServiceSafe(followerShardId.index)
60+
val indexShard = followerIndexService.getShard(followerShardId.id)
61+
5862
private val sequencer = scope.actor<Unit>(capacity = Channel.UNLIMITED) {
5963
// Exceptions thrown here will mark the channel as failed and the next attempt to send to the channel will
6064
// raise the same exception. See [SendChannel.close] method for details.
@@ -88,6 +92,7 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
8892
val tookInNanos = System.nanoTime() - relativeStartNanos
8993
followerClusterStats.stats[followerShardId]!!.totalWriteTime.addAndGet(TimeUnit.NANOSECONDS.toMillis(tookInNanos))
9094
followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet(replayRequest.changes.size.toLong())
95+
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
9196
}
9297
highWatermark = next.changes.lastOrNull()?.seqNo() ?: highWatermark
9398
}

src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,6 +1091,7 @@ class StartReplicationIT: MultiClusterRestTestCase() {
10911091
assertThat(stats.getValue("operations_read").toString()).isEqualTo("50")
10921092
assertThat(stats.getValue("failed_read_requests").toString()).isEqualTo("0")
10931093
assertThat(stats.getValue("failed_write_requests").toString()).isEqualTo("0")
1094+
assertThat(stats.getValue("follower_checkpoint").toString()).isEqualTo((docCount-1).toString())
10941095
assertThat(stats.containsKey("index_stats"))
10951096
assertThat(stats.size).isEqualTo(16)
10961097

src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,32 +11,37 @@
1111

1212
package org.opensearch.replication.task.shard
1313

14-
import org.opensearch.replication.action.changes.GetChangesResponse
15-
import org.opensearch.replication.action.replay.ReplayChangesAction
16-
import org.opensearch.replication.action.replay.ReplayChangesRequest
17-
import org.opensearch.replication.action.replay.ReplayChangesResponse
18-
import org.opensearch.replication.metadata.ReplicationOverallState
19-
import org.opensearch.replication.metadata.store.ReplicationContext
20-
import org.opensearch.replication.metadata.store.ReplicationMetadata
21-
import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType
2214
import kotlinx.coroutines.ExperimentalCoroutinesApi
2315
import kotlinx.coroutines.ObsoleteCoroutinesApi
2416
import kotlinx.coroutines.test.runBlockingTest
2517
import org.assertj.core.api.Assertions.assertThat
18+
import org.mockito.Mockito
2619
import org.opensearch.action.ActionListener
2720
import org.opensearch.action.ActionRequest
2821
import org.opensearch.action.ActionResponse
2922
import org.opensearch.action.ActionType
3023
import org.opensearch.action.support.replication.ReplicationResponse.ShardInfo
3124
import org.opensearch.common.settings.Settings
25+
import org.opensearch.index.IndexService
26+
import org.opensearch.index.shard.IndexShard
3227
import org.opensearch.index.shard.ShardId
3328
import org.opensearch.index.translog.Translog
29+
import org.opensearch.indices.IndicesService
30+
import org.opensearch.replication.action.changes.GetChangesResponse
31+
import org.opensearch.replication.action.replay.ReplayChangesAction
32+
import org.opensearch.replication.action.replay.ReplayChangesRequest
33+
import org.opensearch.replication.action.replay.ReplayChangesResponse
34+
import org.opensearch.replication.metadata.ReplicationOverallState
35+
import org.opensearch.replication.metadata.store.ReplicationContext
36+
import org.opensearch.replication.metadata.store.ReplicationMetadata
37+
import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType
38+
import org.opensearch.replication.util.indicesService
3439
import org.opensearch.tasks.TaskId.EMPTY_TASK_ID
3540
import org.opensearch.test.OpenSearchTestCase
36-
import org.opensearch.test.OpenSearchTestCase.randomList
3741
import org.opensearch.test.client.NoOpClient
3842
import java.util.Locale
3943

44+
4045
@ObsoleteCoroutinesApi
4146
class TranslogSequencerTests : OpenSearchTestCase() {
4247

@@ -83,6 +88,11 @@ class TranslogSequencerTests : OpenSearchTestCase() {
8388
val stats = FollowerClusterStats()
8489
stats.stats[followerShardId] = FollowerShardMetric()
8590
val startSeqNo = randomNonNegativeLong()
91+
indicesService = Mockito.mock(IndicesService::class.java)
92+
val followerIndexService = Mockito.mock(IndexService::class.java)
93+
val indexShard = Mockito.mock(IndexShard::class.java)
94+
Mockito.`when`(indicesService.indexServiceSafe(followerShardId.index)).thenReturn(followerIndexService)
95+
Mockito.`when`(followerIndexService.getShard(followerShardId.id)).thenReturn(indexShard)
8696
val sequencer = TranslogSequencer(this, replicationMetadata, followerShardId, leaderAlias, leaderIndex, EMPTY_TASK_ID,
8797
client, startSeqNo, stats)
8898

0 commit comments

Comments
 (0)