Skip to content

Commit

Permalink
[Remote Store] Fix refresh lag bug on primary term change (opensearch…
Browse files Browse the repository at this point in the history
…-project#10918)

* [Remote Store] Fix refresh lag bug on primary term change

Signed-off-by: Ashish Singh <ssashish@amazon.com>

* Add Integ Tests

Signed-off-by: Ashish Singh <ssashish@amazon.com>

* Incorporate PR review feedback

Signed-off-by: Ashish Singh <ssashish@amazon.com>

* Empty-Commit

Signed-off-by: Ashish Singh <ssashish@amazon.com>

---------

Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 authored Oct 26, 2023
1 parent fb6fe1b commit b17d4a8
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,29 @@ public void testStatsCorrectnessOnFailover() {
logger.info("Test completed");
}

public void testZeroLagOnCreateIndex() throws InterruptedException {
setup();
String clusterManagerNode = internalCluster().getClusterManagerName();

int numOfShards = randomIntBetween(1, 3);
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, numOfShards));
ensureGreen(INDEX_NAME);
long currentTimeNs = System.nanoTime();
while (currentTimeNs == System.nanoTime()) {
Thread.sleep(10);
}

for (int i = 0; i < numOfShards; i++) {
RemoteStoreStatsResponse response = client(clusterManagerNode).admin()
.cluster()
.prepareRemoteStoreStats(INDEX_NAME, String.valueOf(i))
.get();
for (RemoteStoreStats remoteStoreStats : response.getRemoteStoreStats()) {
assertEquals(0, remoteStoreStats.getSegmentStats().refreshTimeLagMs);
}
}
}

private void indexDocs() {
for (int i = 0; i < randomIntBetween(5, 10); i++) {
if (randomBoolean()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public final class RemoteStoreRefreshListener extends CloseableRetryableRefreshL
private final RemoteSegmentStoreDirectory remoteDirectory;
private final RemoteSegmentTransferTracker segmentTracker;
private final Map<String, String> localSegmentChecksumMap;
private long primaryTerm;
private volatile long primaryTerm;
private volatile Iterator<TimeValue> backoffDelayIterator;
private final SegmentReplicationCheckpointPublisher checkpointPublisher;

Expand Down Expand Up @@ -126,10 +126,9 @@ protected void runAfterRefreshExactlyOnce(boolean didRefresh) {
// We have 2 separate methods to check if sync needs to be done or not. This is required since we use the return boolean
// from isReadyForUpload to schedule refresh retries as the index shard or the primary mode are not in complete
// ready state.
if (shouldSync(didRefresh) && isReadyForUpload()) {
segmentTracker.updateLocalRefreshTimeAndSeqNo();
if (shouldSync(didRefresh, true) && isReadyForUpload()) {
try {
initializeRemoteDirectoryOnTermUpdate();
segmentTracker.updateLocalRefreshTimeAndSeqNo();
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
Collection<String> localSegmentsPostRefresh = segmentInfosGatedCloseable.get().files(true);
updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
Expand All @@ -150,25 +149,34 @@ protected void runAfterRefreshExactlyOnce(boolean didRefresh) {
@Override
protected boolean performAfterRefreshWithPermit(boolean didRefresh) {
boolean successful;
if (shouldSync(didRefresh)) {
if (shouldSync(didRefresh, false)) {
successful = syncSegments();
} else {
successful = true;
}
return successful;
}

private boolean shouldSync(boolean didRefresh) {
return this.primaryTerm != indexShard.getOperationPrimaryTerm()
// If the readers change, didRefresh is always true.
|| didRefresh
/**
* This checks if there is a sync required to remote.
*
* @param didRefresh if the readers changed.
* @param skipPrimaryTermCheck consider change in primary term or not for should sync
* @return true if sync is needed
*/
private boolean shouldSync(boolean didRefresh, boolean skipPrimaryTermCheck) {
boolean shouldSync = didRefresh // If the readers change, didRefresh is always true.
// The third condition exists for uploading the zero state segments where the refresh has not changed the reader
// reference, but it is important to upload the zero state segments so that the restore does not break.
|| remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()
// When the shouldSync is called the first time, then 1st condition on primary term is true. But after that
// we update the primary term and the same condition would not evaluate to true again in syncSegments.
// Below check ensures that if there is commit, then that gets picked up by both 1st and 2nd shouldSync call.
|| isRefreshAfterCommitSafe();
if (shouldSync || skipPrimaryTermCheck) {
return shouldSync;
}
return this.primaryTerm != indexShard.getOperationPrimaryTerm();
}

private boolean syncSegments() {
Expand All @@ -188,6 +196,7 @@ private boolean syncSegments() {

try {
try {
initializeRemoteDirectoryOnTermUpdate();
// if a new segments_N file is present in local that is not uploaded to remote store yet, it
// is considered as a first refresh post commit. A cleanup of stale commit files is triggered.
// This is done to avoid delete post each refresh.
Expand Down

0 comments on commit b17d4a8

Please sign in to comment.