Skip to content

Commit b0587bc

Browse files
committed
Reset replica engine to global checkpoint on promotion (#33473)
When a replica starts following a newly promoted primary, it may have some operations which don't exist on the new primary. Thus we need to throw those operations to align a replica with the new primary. This can be done by first resetting an engine from the safe commit, then replaying the local translog up to the global checkpoint. Relates #32867
1 parent fa301ad commit b0587bc

File tree

16 files changed

+274
-121
lines changed

16 files changed

+274
-121
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -678,12 +678,6 @@ public final CommitStats commitStats() {
678678
*/
679679
public abstract void waitForOpsToComplete(long seqNo) throws InterruptedException;
680680

681-
/**
682-
* Reset the local checkpoint in the tracker to the given local checkpoint
683-
* @param localCheckpoint the new checkpoint to be set
684-
*/
685-
public abstract void resetLocalCheckpoint(long localCheckpoint);
686-
687681
/**
688682
* @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint
689683
*/
@@ -1165,11 +1159,16 @@ public enum Origin {
11651159
PRIMARY,
11661160
REPLICA,
11671161
PEER_RECOVERY,
1168-
LOCAL_TRANSLOG_RECOVERY;
1162+
LOCAL_TRANSLOG_RECOVERY,
1163+
LOCAL_RESET;
11691164

11701165
public boolean isRecovery() {
11711166
return this == PEER_RECOVERY || this == LOCAL_TRANSLOG_RECOVERY;
11721167
}
1168+
1169+
boolean isFromTranslog() {
1170+
return this == LOCAL_TRANSLOG_RECOVERY || this == LOCAL_RESET;
1171+
}
11731172
}
11741173

11751174
public Origin origin() {

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,7 @@ private boolean canOptimizeAddDocument(Index index) {
755755
: "version: " + index.version() + " type: " + index.versionType();
756756
return true;
757757
case LOCAL_TRANSLOG_RECOVERY:
758+
case LOCAL_RESET:
758759
assert index.isRetry();
759760
return true; // allow to optimize in order to update the max safe time stamp
760761
default:
@@ -881,7 +882,7 @@ public IndexResult index(Index index) throws IOException {
881882
indexResult = new IndexResult(
882883
plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
883884
}
884-
if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
885+
if (index.origin().isFromTranslog() == false) {
885886
final Translog.Location location;
886887
if (indexResult.getResultType() == Result.Type.SUCCESS) {
887888
location = translog.add(new Translog.Index(index, indexResult));
@@ -1234,7 +1235,7 @@ public DeleteResult delete(Delete delete) throws IOException {
12341235
deleteResult = new DeleteResult(
12351236
plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false);
12361237
}
1237-
if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
1238+
if (delete.origin().isFromTranslog() == false) {
12381239
final Translog.Location location;
12391240
if (deleteResult.getResultType() == Result.Type.SUCCESS) {
12401241
location = translog.add(new Translog.Delete(delete, deleteResult));
@@ -1485,7 +1486,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
14851486
}
14861487
}
14871488
final NoOpResult noOpResult = failure != null ? new NoOpResult(getPrimaryTerm(), noOp.seqNo(), failure) : new NoOpResult(getPrimaryTerm(), noOp.seqNo());
1488-
if (noOp.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
1489+
if (noOp.origin().isFromTranslog() == false) {
14891490
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
14901491
noOpResult.setTranslogLocation(location);
14911492
}
@@ -2404,11 +2405,6 @@ public void waitForOpsToComplete(long seqNo) throws InterruptedException {
24042405
localCheckpointTracker.waitForOpsToComplete(seqNo);
24052406
}
24062407

2407-
@Override
2408-
public void resetLocalCheckpoint(long localCheckpoint) {
2409-
localCheckpointTracker.resetCheckpoint(localCheckpoint);
2410-
}
2411-
24122408
@Override
24132409
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
24142410
return localCheckpointTracker.getStats(globalCheckpoint);

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -257,10 +257,6 @@ public long getLocalCheckpoint() {
257257
public void waitForOpsToComplete(long seqNo) {
258258
}
259259

260-
@Override
261-
public void resetLocalCheckpoint(long newCheckpoint) {
262-
}
263-
264260
@Override
265261
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
266262
return new SeqNoStats(seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), globalCheckpoint);

server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) {
109109
* @param checkpoint the local checkpoint to reset this tracker to
110110
*/
111111
public synchronized void resetCheckpoint(final long checkpoint) {
112+
// TODO: remove this method as after we restore the local history on promotion.
112113
assert checkpoint != SequenceNumbers.UNASSIGNED_SEQ_NO;
113114
assert checkpoint <= this.checkpoint;
114115
processedSeqNo.clear();

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 56 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,6 @@
164164
import java.util.stream.StreamSupport;
165165

166166
import static org.elasticsearch.index.mapper.SourceToParse.source;
167-
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
168167
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
169168

170169
public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard {
@@ -1307,16 +1306,18 @@ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine
13071306
return result;
13081307
}
13091308

1310-
// package-private for testing
1311-
int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException {
1312-
recoveryState.getTranslog().totalOperations(snapshot.totalOperations());
1313-
recoveryState.getTranslog().totalOperationsOnStart(snapshot.totalOperations());
1309+
/**
1310+
* Replays translog operations from the provided translog {@code snapshot} to the current engine using the given {@code origin}.
1311+
* The callback {@code onOperationRecovered} is notified after each translog operation is replayed successfully.
1312+
*/
1313+
int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operation.Origin origin,
1314+
Runnable onOperationRecovered) throws IOException {
13141315
int opsRecovered = 0;
13151316
Translog.Operation operation;
13161317
while ((operation = snapshot.next()) != null) {
13171318
try {
13181319
logger.trace("[translog] recover op {}", operation);
1319-
Engine.Result result = applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY);
1320+
Engine.Result result = applyTranslogOperation(operation, origin);
13201321
switch (result.getResultType()) {
13211322
case FAILURE:
13221323
throw result.getFailure();
@@ -1329,7 +1330,7 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOExce
13291330
}
13301331

13311332
opsRecovered++;
1332-
recoveryState.getTranslog().incrementRecoveredOperations();
1333+
onOperationRecovered.run();
13331334
} catch (Exception e) {
13341335
if (ExceptionsHelper.status(e) == RestStatus.BAD_REQUEST) {
13351336
// mainly for MapperParsingException and Failure to detect xcontent
@@ -1347,8 +1348,15 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOExce
13471348
* Operations from the translog will be replayed to bring lucene up to date.
13481349
**/
13491350
public void openEngineAndRecoverFromTranslog() throws IOException {
1351+
final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog();
1352+
final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> {
1353+
translogRecoveryStats.totalOperations(snapshot.totalOperations());
1354+
translogRecoveryStats.totalOperationsOnStart(snapshot.totalOperations());
1355+
return runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY,
1356+
translogRecoveryStats::incrementRecoveredOperations);
1357+
};
13501358
innerOpenEngineAndTranslog();
1351-
getEngine().recoverFromTranslog(this::runTranslogRecovery, Long.MAX_VALUE);
1359+
getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
13521360
}
13531361

13541362
/**
@@ -1386,11 +1394,7 @@ private void innerOpenEngineAndTranslog() throws IOException {
13861394
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
13871395
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
13881396
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
1389-
1390-
assertMaxUnsafeAutoIdInCommit();
1391-
1392-
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID);
1393-
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated());
1397+
trimUnsafeCommits();
13941398

13951399
createNewEngine(config);
13961400
verifyNotClosed();
@@ -1401,6 +1405,15 @@ private void innerOpenEngineAndTranslog() throws IOException {
14011405
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
14021406
}
14031407

1408+
private void trimUnsafeCommits() throws IOException {
1409+
assert currentEngineReference.get() == null : "engine is running";
1410+
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
1411+
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
1412+
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID);
1413+
assertMaxUnsafeAutoIdInCommit();
1414+
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, indexSettings.getIndexVersionCreated());
1415+
}
1416+
14041417
private boolean assertSequenceNumbersInCommit() throws IOException {
14051418
final Map<String, String> userData = SegmentInfos.readLatestCommit(store.directory()).getUserData();
14061419
assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint";
@@ -1501,7 +1514,7 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn
15011514
if (origin == Engine.Operation.Origin.PRIMARY) {
15021515
assert assertPrimaryMode();
15031516
} else {
1504-
assert origin == Engine.Operation.Origin.REPLICA;
1517+
assert origin == Engine.Operation.Origin.REPLICA || origin == Engine.Operation.Origin.LOCAL_RESET;
15051518
assert assertReplicationTarget();
15061519
}
15071520
if (writeAllowedStates.contains(state) == false) {
@@ -2207,9 +2220,7 @@ public void onFailedEngine(String reason, @Nullable Exception failure) {
22072220

22082221
private Engine createNewEngine(EngineConfig config) {
22092222
synchronized (mutex) {
2210-
if (state == IndexShardState.CLOSED) {
2211-
throw new AlreadyClosedException(shardId + " can't create engine - shard is closed");
2212-
}
2223+
verifyNotClosed();
22132224
assert this.currentEngineReference.get() == null;
22142225
Engine engine = newEngine(config);
22152226
onNewEngine(engine); // call this before we pass the memory barrier otherwise actions that happen
@@ -2355,19 +2366,14 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long g
23552366
bumpPrimaryTerm(opPrimaryTerm, () -> {
23562367
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
23572368
final long currentGlobalCheckpoint = getGlobalCheckpoint();
2358-
final long localCheckpoint;
2359-
if (currentGlobalCheckpoint == UNASSIGNED_SEQ_NO) {
2360-
localCheckpoint = NO_OPS_PERFORMED;
2369+
final long maxSeqNo = seqNoStats().getMaxSeqNo();
2370+
logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
2371+
opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
2372+
if (currentGlobalCheckpoint < maxSeqNo) {
2373+
resetEngineToGlobalCheckpoint();
23612374
} else {
2362-
localCheckpoint = currentGlobalCheckpoint;
2375+
getEngine().rollTranslogGeneration();
23632376
}
2364-
logger.trace(
2365-
"detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]",
2366-
opPrimaryTerm,
2367-
getLocalCheckpoint(),
2368-
localCheckpoint);
2369-
getEngine().resetLocalCheckpoint(localCheckpoint);
2370-
getEngine().rollTranslogGeneration();
23712377
});
23722378
}
23732379
}
@@ -2663,4 +2669,26 @@ public ParsedDocument newNoopTombstoneDoc(String reason) {
26632669
}
26642670
};
26652671
}
2672+
2673+
/**
2674+
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
2675+
*/
2676+
void resetEngineToGlobalCheckpoint() throws IOException {
2677+
assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]";
2678+
sync(); // persist the global checkpoint to disk
2679+
final long globalCheckpoint = getGlobalCheckpoint();
2680+
final Engine newEngine;
2681+
synchronized (mutex) {
2682+
verifyNotClosed();
2683+
IOUtils.close(currentEngineReference.getAndSet(null));
2684+
trimUnsafeCommits();
2685+
newEngine = createNewEngine(newEngineConfig());
2686+
active.set(true);
2687+
}
2688+
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
2689+
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
2690+
// TODO: add a dedicate recovery stats for the reset translog
2691+
});
2692+
newEngine.recoverFromTranslog(translogRunner, globalCheckpoint);
2693+
}
26662694
}

server/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ protected void beforeIndexDeletion() throws Exception {
111111
super.beforeIndexDeletion();
112112
internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex();
113113
assertSeqNos();
114+
assertSameDocIdsOnShards();
114115
}
115116
}
116117

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4105,7 +4105,7 @@ public void markSeqNoAsCompleted(long seqNo) {
41054105
final long currentLocalCheckpoint = actualEngine.getLocalCheckpoint();
41064106
final long resetLocalCheckpoint =
41074107
randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Math.toIntExact(currentLocalCheckpoint));
4108-
actualEngine.resetLocalCheckpoint(resetLocalCheckpoint);
4108+
actualEngine.getLocalCheckpointTracker().resetCheckpoint(resetLocalCheckpoint);
41094109
completedSeqNos.clear();
41104110
actualEngine.restoreLocalCheckpointFromTranslog();
41114111
final Set<Long> intersection = new HashSet<>(expectedCompletedSeqNos);

server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.elasticsearch.index.store.Store;
2929

3030
import java.io.IOException;
31-
import java.util.Set;
31+
import java.util.List;
3232
import java.util.concurrent.atomic.AtomicLong;
3333
import java.util.function.Function;
3434

@@ -44,7 +44,7 @@ public void testReadOnlyEngine() throws Exception {
4444
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
4545
int numDocs = scaledRandomIntBetween(10, 1000);
4646
final SeqNoStats lastSeqNoStats;
47-
final Set<String> lastDocIds;
47+
final List<DocIdSeqNoAndTerm> lastDocIds;
4848
try (InternalEngine engine = createEngine(config)) {
4949
Engine.Get get = null;
5050
for (int i = 0; i < numDocs; i++) {

server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -522,18 +522,14 @@ public void testSeqNoCollision() throws Exception {
522522
shards.promoteReplicaToPrimary(replica2).get();
523523
logger.info("--> Recover replica3 from replica2");
524524
recoverReplica(replica3, replica2, true);
525-
try (Translog.Snapshot snapshot = getTranslog(replica3).newSnapshot()) {
525+
try (Translog.Snapshot snapshot = replica3.getHistoryOperations("test", 0)) {
526526
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
527527
final List<Translog.Operation> expectedOps = new ArrayList<>(initOperations);
528528
expectedOps.add(op2);
529529
assertThat(snapshot, containsOperationsInAnyOrder(expectedOps));
530530
assertThat("Peer-recovery should not send overridden operations", snapshot.skippedOperations(), equalTo(0));
531531
}
532-
// TODO: We should assert the content of shards in the ReplicationGroup.
533-
// Without rollback replicas(current implementation), we don't have the same content across shards:
534-
// - replica1 has {doc1}
535-
// - replica2 has {doc1, doc2}
536-
// - replica3 can have either {doc2} only if operation-based recovery or {doc1, doc2} if file-based recovery
532+
shards.assertAllEqual(initDocs + 1);
537533
}
538534
}
539535

0 commit comments

Comments
 (0)