Skip to content

Commit 8a4d27a

Browse files
committed
Replica start peer recovery with safe commit (#28181)
Today a replica starts a peer-recovery with the last commit. If the last commit is not a safe commit, a replica will immediately fallback to the file based sync which is more expensive than the sequence based recovery. This commit modifies the peer-recovery in replica to start with a safe commit. Moreover we can keep the existing translog on the target if the recovery is sequence based recovery. Relates #10708
1 parent a48be3f commit 8a4d27a

15 files changed

+188
-118
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1533,6 +1533,11 @@ public interface Warmer {
15331533
*/
15341534
public abstract Engine recoverFromTranslog() throws IOException;
15351535

1536+
/**
1537+
* Do not replay translog operations, but make the engine be ready.
1538+
*/
1539+
public abstract void skipTranslogRecovery();
1540+
15361541
/**
15371542
* Returns <code>true</code> iff this engine is currently recovering from translog.
15381543
*/

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,15 @@ public InternalEngine recoverFromTranslog() throws IOException {
406406
return this;
407407
}
408408

409+
@Override
410+
public void skipTranslogRecovery() {
411+
if (openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
412+
throw new IllegalStateException("Can't skip translog recovery with open mode: " + openMode);
413+
}
414+
assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
415+
pendingTranslogRecovery.set(false); // we are good - now we can commit
416+
}
417+
409418
private IndexCommit getStartingCommitPoint() throws IOException {
410419
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
411420
final Path translogPath = engineConfig.getTranslogConfig().getTranslogPath();

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1298,6 +1298,7 @@ public void createIndexAndTranslog() throws IOException {
12981298
translogStats.totalOperationsOnStart(0);
12991299
globalCheckpointTracker.updateGlobalCheckpointOnReplica(SequenceNumbers.NO_OPS_PERFORMED, "index created");
13001300
innerOpenEngineAndTranslog(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, false);
1301+
assertSequenceNumbersInCommit();
13011302
}
13021303

13031304
/** opens the engine on top of the existing lucene engine but creates an empty translog **/
@@ -1310,15 +1311,29 @@ public void openIndexAndCreateTranslog(boolean forceNewHistoryUUID, long globalC
13101311
+ globalCheckpoint + "]";
13111312
globalCheckpointTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "opening index with a new translog");
13121313
innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG, forceNewHistoryUUID);
1314+
assertSequenceNumbersInCommit();
13131315
}
13141316

13151317
/**
13161318
* opens the engine on top of the existing lucene engine and translog.
13171319
* Operations from the translog will be replayed to bring lucene up to date.
13181320
**/
1319-
public void openIndexAndTranslog() throws IOException {
1321+
public void openIndexAndRecoveryFromTranslog() throws IOException {
13201322
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.EXISTING_STORE;
13211323
innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, false);
1324+
getEngine().recoverFromTranslog();
1325+
assertSequenceNumbersInCommit();
1326+
}
1327+
1328+
/**
1329+
* Opens the engine on top of the existing lucene engine and translog.
1330+
* The translog is kept but its operations won't be replayed.
1331+
*/
1332+
public void openIndexAndSkipTranslogRecovery() throws IOException {
1333+
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.PEER;
1334+
innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, false);
1335+
getEngine().skipTranslogRecovery();
1336+
assertSequenceNumbersInCommit();
13221337
}
13231338

13241339
private void innerOpenEngineAndTranslog(final EngineConfig.OpenMode openMode, final boolean forceNewHistoryUUID) throws IOException {
@@ -1350,15 +1365,13 @@ private void innerOpenEngineAndTranslog(final EngineConfig.OpenMode openMode, fi
13501365
globalCheckpointTracker.updateGlobalCheckpointOnReplica(Translog.readGlobalCheckpoint(translogConfig.getTranslogPath()),
13511366
"read from translog checkpoint");
13521367
}
1353-
Engine newEngine = createNewEngine(config);
1368+
createNewEngine(config);
13541369
verifyNotClosed();
13551370
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
13561371
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
13571372
// we still give sync'd flush a chance to run:
13581373
active.set(true);
1359-
newEngine.recoverFromTranslog();
13601374
}
1361-
assertSequenceNumbersInCommit();
13621375
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
13631376
}
13641377

server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
401401
logger.debug("failed to list file details", e);
402402
}
403403
if (indexShouldExists) {
404-
indexShard.openIndexAndTranslog();
404+
indexShard.openIndexAndRecoveryFromTranslog();
405405
indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm());
406406
} else {
407407
indexShard.createIndexAndTranslog();

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import org.apache.logging.log4j.message.ParameterizedMessage;
2323
import org.apache.logging.log4j.util.Supplier;
24+
import org.apache.lucene.index.DirectoryReader;
25+
import org.apache.lucene.index.IndexCommit;
2426
import org.apache.lucene.store.AlreadyClosedException;
2527
import org.apache.lucene.store.RateLimiter;
2628
import org.elasticsearch.ElasticsearchException;
@@ -39,6 +41,7 @@
3941
import org.elasticsearch.common.util.CancellableThreads;
4042
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
4143
import org.elasticsearch.index.IndexNotFoundException;
44+
import org.elasticsearch.index.engine.CombinedDeletionPolicy;
4245
import org.elasticsearch.index.engine.RecoveryEngineException;
4346
import org.elasticsearch.index.mapper.MapperException;
4447
import org.elasticsearch.index.seqno.SequenceNumbers;
@@ -60,6 +63,7 @@
6063
import org.elasticsearch.transport.TransportService;
6164

6265
import java.io.IOException;
66+
import java.util.List;
6367
import java.util.concurrent.atomic.AtomicLong;
6468
import java.util.concurrent.atomic.AtomicReference;
6569

@@ -108,8 +112,8 @@ public PeerRecoveryTargetService(Settings settings, ThreadPool threadPool, Trans
108112
FileChunkTransportRequestHandler());
109113
transportService.registerRequestHandler(Actions.CLEAN_FILES, RecoveryCleanFilesRequest::new, ThreadPool.Names.GENERIC, new
110114
CleanFilesRequestHandler());
111-
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, RecoveryPrepareForTranslogOperationsRequest::new, ThreadPool
112-
.Names.GENERIC, new PrepareForTranslogOperationsRequestHandler());
115+
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, RecoveryPrepareForTranslogOperationsRequest::new,
116+
ThreadPool.Names.GENERIC, new PrepareForTranslogOperationsRequestHandler());
113117
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, RecoveryTranslogOperationsRequest::new, ThreadPool.Names.GENERIC,
114118
new TranslogOperationsRequestHandler());
115119
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new
@@ -353,7 +357,9 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
353357
public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) {
354358
try {
355359
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation());
356-
final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo(null);
360+
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(recoveryTarget.store().directory());
361+
final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint);
362+
final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo(safeCommit);
357363
if (seqNoStats.maxSeqNo <= globalCheckpoint) {
358364
assert seqNoStats.localCheckpoint <= globalCheckpoint;
359365
/*
@@ -387,7 +393,7 @@ class PrepareForTranslogOperationsRequestHandler implements TransportRequestHand
387393
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
388394
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
389395
)) {
390-
recoveryRef.target().prepareForTranslogOperations(request.totalTranslogOps());
396+
recoveryRef.target().prepareForTranslogOperations(request.createNewTranslog(), request.totalTranslogOps());
391397
}
392398
channel.sendResponse(TransportResponse.Empty.INSTANCE);
393399
}

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,16 @@ public class RecoveryPrepareForTranslogOperationsRequest extends TransportReques
3333
private long recoveryId;
3434
private ShardId shardId;
3535
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
36+
private boolean createNewTranslog;
3637

3738
public RecoveryPrepareForTranslogOperationsRequest() {
3839
}
3940

40-
RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps) {
41+
RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps, boolean createNewTranslog) {
4142
this.recoveryId = recoveryId;
4243
this.shardId = shardId;
4344
this.totalTranslogOps = totalTranslogOps;
45+
this.createNewTranslog = createNewTranslog;
4446
}
4547

4648
public long recoveryId() {
@@ -55,6 +57,13 @@ public int totalTranslogOps() {
5557
return totalTranslogOps;
5658
}
5759

60+
/**
61+
* Whether or not the recover target should create a new local translog
62+
*/
63+
boolean createNewTranslog() {
64+
return createNewTranslog;
65+
}
66+
5867
@Override
5968
public void readFrom(StreamInput in) throws IOException {
6069
super.readFrom(in);
@@ -64,6 +73,11 @@ public void readFrom(StreamInput in) throws IOException {
6473
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
6574
in.readLong(); // maxUnsafeAutoIdTimestamp
6675
}
76+
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
77+
createNewTranslog = in.readBoolean();
78+
} else {
79+
createNewTranslog = true;
80+
}
6781
}
6882

6983
@Override
@@ -75,5 +89,8 @@ public void writeTo(StreamOutput out) throws IOException {
7589
if (out.getVersion().before(Version.V_6_0_0_alpha1)) {
7690
out.writeLong(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); // maxUnsafeAutoIdTimestamp
7791
}
92+
if (out.getVersion().onOrAfter(Version.V_6_2_0)) {
93+
out.writeBoolean(createNewTranslog);
94+
}
7895
}
7996
}

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,9 @@ public RecoveryResponse recoverToTarget() throws IOException {
150150

151151
final long startingSeqNo;
152152
final long requiredSeqNoRangeStart;
153-
final boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
153+
final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
154154
isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery();
155-
if (isSequenceNumberBasedRecoveryPossible) {
155+
if (isSequenceNumberBasedRecovery) {
156156
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
157157
startingSeqNo = request.startingSeqNo();
158158
requiredSeqNoRangeStart = startingSeqNo;
@@ -188,7 +188,8 @@ public RecoveryResponse recoverToTarget() throws IOException {
188188
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()));
189189

190190
try {
191-
prepareTargetForTranslog(translog.estimateTotalOperationsFromMinSeq(startingSeqNo));
191+
// For a sequence based recovery, the target can keep its local translog
192+
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, translog.estimateTotalOperationsFromMinSeq(startingSeqNo));
192193
} catch (final Exception e) {
193194
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
194195
}
@@ -421,13 +422,13 @@ public void phase1(final IndexCommit snapshot, final Supplier<Integer> translogO
421422
}
422423
}
423424

424-
void prepareTargetForTranslog(final int totalTranslogOps) throws IOException {
425+
void prepareTargetForTranslog(final boolean createNewTranslog, final int totalTranslogOps) throws IOException {
425426
StopWatch stopWatch = new StopWatch().start();
426427
logger.trace("recovery [phase1]: prepare remote engine for translog");
427428
final long startEngineStart = stopWatch.totalTime().millis();
428429
// Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
429430
// garbage collection (not the JVM's GC!) of tombstone deletes.
430-
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps));
431+
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(createNewTranslog, totalTranslogOps));
431432
stopWatch.stop();
432433

433434
response.startTime = stopWatch.totalTime().millis() - startEngineStart;

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -362,10 +362,14 @@ private void ensureRefCount() {
362362
/*** Implementation of {@link RecoveryTargetHandler } */
363363

364364
@Override
365-
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
365+
public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException {
366366
state().getTranslog().totalOperations(totalTranslogOps);
367-
// TODO: take the local checkpoint from store as global checkpoint, once we know it's safe
368-
indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO);
367+
if (createNewTranslog) {
368+
// TODO: Assigns the global checkpoint to the max_seqno of the safe commit if the index version >= 6.2
369+
indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO);
370+
} else {
371+
indexShard().openIndexAndSkipTranslogRecovery();
372+
}
369373
}
370374

371375
@Override

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ public interface RecoveryTargetHandler {
3232

3333
/**
3434
* Prepares the target to receive translog operations, after all file have been copied
35-
*
36-
* @param totalTranslogOps total translog operations expected to be sent
35+
* @param createNewTranslog whether or not to delete the local translog on the target
36+
* @param totalTranslogOps total translog operations expected to be sent
3737
*/
38-
void prepareForTranslogOperations(int totalTranslogOps) throws IOException;
38+
void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException;
3939

4040
/**
4141
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and

server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportSe
7676
}
7777

7878
@Override
79-
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
79+
public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException {
8080
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG,
81-
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps),
81+
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, createNewTranslog),
8282
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
8383
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
8484
}

0 commit comments

Comments
 (0)