Skip to content

Commit 3bfc53d

Browse files
committed
Correctly updating the followerCheckpoint in stats api (#438)
Summary : We need to update followerCheckpoint after writing to the follower index. Currently, we are not waiting for the writes and updating it with soon-to-be stale values Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
1 parent 4ecabd4 commit 3bfc53d

File tree

4 files changed

+17
-3
lines changed

4 files changed

+17
-3
lines changed

src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/ShardReplicationTask.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
222222
TaskId(clusterService.nodeName, id), client, indexShard.localCheckpoint, followerClusterStats)
223223

224224
val changeTracker = ShardReplicationChangesTracker(indexShard, replicationSettings)
225+
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
225226
coroutineScope {
226227
while (isActive) {
227228
rateLimiter.acquire()
@@ -278,7 +279,6 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
278279
//hence renew retention lease with lastSyncedGlobalCheckpoint + 1 so that any shard that picks up shard replication task has data until then.
279280
try {
280281
retentionLeaseHelper.renewRetentionLease(leaderShardId, indexShard.lastSyncedGlobalCheckpoint + 1, followerShardId)
281-
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.lastSyncedGlobalCheckpoint
282282
lastLeaseRenewalMillis = System.currentTimeMillis()
283283
} catch (ex: Exception) {
284284
when (ex) {

src/main/kotlin/com/amazon/elasticsearch/replication/task/shard/TranslogSequencer.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.elasticsearch.client.Client
3232
import org.elasticsearch.common.logging.Loggers
3333
import org.elasticsearch.index.shard.ShardId
3434
import org.elasticsearch.index.translog.Translog
35+
import com.amazon.elasticsearch.replication.util.indicesService
3536
import org.elasticsearch.tasks.TaskId
3637
import java.util.ArrayList
3738
import java.util.concurrent.ConcurrentHashMap
@@ -59,6 +60,9 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
5960
private val log = Loggers.getLogger(javaClass, followerShardId)!!
6061
private val completed = CompletableDeferred<Unit>()
6162

63+
val followerIndexService = indicesService.indexServiceSafe(followerShardId.index)
64+
val indexShard = followerIndexService.getShard(followerShardId.id)
65+
6266
private val sequencer = scope.actor<Unit>(capacity = Channel.UNLIMITED) {
6367
// Exceptions thrown here will mark the channel as failed and the next attempt to send to the channel will
6468
// raise the same exception. See [SendChannel.close] method for details.
@@ -92,6 +96,7 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
9296
val tookInNanos = System.nanoTime() - relativeStartNanos
9397
followerClusterStats.stats[followerShardId]!!.totalWriteTime.addAndGet(TimeUnit.NANOSECONDS.toMillis(tookInNanos))
9498
followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet(replayRequest.changes.size.toLong())
99+
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
95100
}
96101
highWatermark = next.changes.lastOrNull()?.seqNo() ?: highWatermark
97102
}

src/test/kotlin/com/amazon/elasticsearch/replication/integ/rest/StartReplicationIT.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1055,8 +1055,6 @@ class StartReplicationIT: MultiClusterRestTestCase() {
10551055

10561056
followerClient.pauseReplication(followerIndex2)
10571057
followerClient.stopReplication(followerIndex3)
1058-
1059-
10601058
val stats = followerClient.followerStats()
10611059
assertThat(stats.getValue("num_syncing_indices").toString()).isEqualTo("1")
10621060
assertThat(stats.getValue("num_paused_indices").toString()).isEqualTo("1")
@@ -1066,6 +1064,7 @@ class StartReplicationIT: MultiClusterRestTestCase() {
10661064
assertThat(stats.getValue("operations_read").toString()).isEqualTo("50")
10671065
assertThat(stats.getValue("failed_read_requests").toString()).isEqualTo("0")
10681066
assertThat(stats.getValue("failed_write_requests").toString()).isEqualTo("0")
1067+
assertThat(stats.getValue("follower_checkpoint").toString()).isEqualTo((docCount-1).toString())
10691068
assertThat(stats.containsKey("index_stats"))
10701069
assertThat(stats.size).isEqualTo(16)
10711070
} finally {

src/test/kotlin/com/amazon/elasticsearch/replication/task/shard/TranslogSequencerTests.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,26 @@ import com.amazon.elasticsearch.replication.metadata.store.ReplicationStoreMetad
2626
import kotlinx.coroutines.ExperimentalCoroutinesApi
2727
import kotlinx.coroutines.ObsoleteCoroutinesApi
2828
import kotlinx.coroutines.test.runBlockingTest
29+
import org.mockito.Mockito
2930
import org.assertj.core.api.Assertions.assertThat
3031
import org.elasticsearch.action.ActionListener
3132
import org.elasticsearch.action.ActionRequest
3233
import org.elasticsearch.action.ActionResponse
3334
import org.elasticsearch.action.ActionType
3435
import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo
3536
import org.elasticsearch.common.settings.Settings
37+
import org.elasticsearch.index.IndexService
38+
import org.elasticsearch.index.shard.IndexShard
3639
import org.elasticsearch.index.shard.ShardId
3740
import org.elasticsearch.index.translog.Translog
41+
import org.elasticsearch.indices.IndicesService
3842
import org.elasticsearch.tasks.TaskId.EMPTY_TASK_ID
3943
import org.elasticsearch.test.ESTestCase
4044
import org.elasticsearch.test.ESTestCase.randomList
4145
import org.elasticsearch.test.client.NoOpClient
4246
import java.util.Locale
4347

48+
4449
@ObsoleteCoroutinesApi
4550
class TranslogSequencerTests : ESTestCase() {
4651

@@ -87,6 +92,11 @@ class TranslogSequencerTests : ESTestCase() {
8792
val stats = FollowerClusterStats()
8893
stats.stats[followerShardId] = FollowerShardMetric()
8994
val startSeqNo = randomNonNegativeLong()
95+
indicesService = Mockito.mock(IndicesService::class.java)
96+
val followerIndexService = Mockito.mock(IndexService::class.java)
97+
val indexShard = Mockito.mock(IndexShard::class.java)
98+
Mockito.`when`(indicesService.indexServiceSafe(followerShardId.index)).thenReturn(followerIndexService)
99+
Mockito.`when`(followerIndexService.getShard(followerShardId.id)).thenReturn(indexShard)
90100
val sequencer = TranslogSequencer(this, replicationMetadata, followerShardId, leaderAlias, leaderIndex, EMPTY_TASK_ID,
91101
client, startSeqNo, stats)
92102

0 commit comments

Comments
 (0)