Skip to content

Commit e9999df

Browse files
authored
Init global checkpoint after copy commit in peer recovery (#40823)
Today a new replica of a closed index does not have a safe commit invariant when its engine is opened because we won't initialize the global checkpoint on a recovering replica until the finalize step. With this change, we can achieve that property by creating a new translog with the global checkpoint from the primary at the end of phase 1.
1 parent 79c7a57 commit e9999df

File tree

13 files changed

+83
-49
lines changed

13 files changed

+83
-49
lines changed

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,15 +103,16 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
103103
this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats;
104104
if (seqNoStats == null) {
105105
seqNoStats = buildSeqNoStats(lastCommittedSegmentInfos);
106-
// During a peer-recovery the global checkpoint is not known and up to date when the engine
107-
// is created, so we only check the max seq no / global checkpoint coherency when the global
106+
// Before 8.0 the global checkpoint is not known and up to date when the engine is created after
107+
// peer recovery, so we only check the max seq no / global checkpoint coherency when the global
108108
// checkpoint is different from the unassigned sequence number value.
109109
// In addition to that we only execute the check if the index the engine belongs to has been
110110
// created after the refactoring of the Close Index API and its TransportVerifyShardBeforeCloseAction
111111
// that guarantee that all operations have been flushed to Lucene.
112112
final long globalCheckpoint = engineConfig.getGlobalCheckpointSupplier().getAsLong();
113-
if (globalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO
114-
&& engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_7_0)) {
113+
final Version indexVersionCreated = engineConfig.getIndexSettings().getIndexVersionCreated();
114+
if (indexVersionCreated.onOrAfter(Version.V_7_1_0) ||
115+
(globalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO && indexVersionCreated.onOrAfter(Version.V_6_7_0))) {
115116
if (seqNoStats.getMaxSeqNo() != globalCheckpoint) {
116117
assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), globalCheckpoint);
117118
throw new IllegalStateException("Maximum sequence number [" + seqNoStats.getMaxSeqNo()

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,8 @@ public PeerRecoveryTargetService(ThreadPool threadPool, TransportService transpo
115115
FilesInfoRequestHandler());
116116
transportService.registerRequestHandler(Actions.FILE_CHUNK, RecoveryFileChunkRequest::new, ThreadPool.Names.GENERIC, new
117117
FileChunkTransportRequestHandler());
118-
transportService.registerRequestHandler(Actions.CLEAN_FILES, RecoveryCleanFilesRequest::new, ThreadPool.Names.GENERIC, new
119-
CleanFilesRequestHandler());
118+
transportService.registerRequestHandler(Actions.CLEAN_FILES, ThreadPool.Names.GENERIC,
119+
RecoveryCleanFilesRequest::new, new CleanFilesRequestHandler());
120120
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, ThreadPool.Names.GENERIC,
121121
RecoveryPrepareForTranslogOperationsRequest::new, new PrepareForTranslogOperationsRequestHandler());
122122
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, RecoveryTranslogOperationsRequest::new, ThreadPool.Names.GENERIC,
@@ -540,7 +540,7 @@ class CleanFilesRequestHandler implements TransportRequestHandler<RecoveryCleanF
540540
public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel, Task task) throws Exception {
541541
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
542542
)) {
543-
recoveryRef.target().cleanFiles(request.totalTranslogOps(), request.sourceMetaSnapshot());
543+
recoveryRef.target().cleanFiles(request.totalTranslogOps(), request.getGlobalCheckpoint(), request.sourceMetaSnapshot());
544544
channel.sendResponse(TransportResponse.Empty.INSTANCE);
545545
}
546546
}

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919

2020
package org.elasticsearch.indices.recovery;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.common.io.stream.StreamInput;
2324
import org.elasticsearch.common.io.stream.StreamOutput;
25+
import org.elasticsearch.index.seqno.SequenceNumbers;
2426
import org.elasticsearch.index.shard.ShardId;
2527
import org.elasticsearch.index.store.Store;
2628
import org.elasticsearch.transport.TransportRequest;
@@ -29,37 +31,32 @@
2931

3032
public class RecoveryCleanFilesRequest extends TransportRequest {
3133

32-
private long recoveryId;
33-
private ShardId shardId;
34+
private final long recoveryId;
35+
private final ShardId shardId;
36+
private final Store.MetadataSnapshot snapshotFiles;
37+
private final int totalTranslogOps;
38+
private final long globalCheckpoint;
3439

35-
private Store.MetadataSnapshot snapshotFiles;
36-
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
37-
38-
public RecoveryCleanFilesRequest() {
39-
}
40-
41-
RecoveryCleanFilesRequest(long recoveryId, ShardId shardId, Store.MetadataSnapshot snapshotFiles, int totalTranslogOps) {
40+
RecoveryCleanFilesRequest(long recoveryId, ShardId shardId, Store.MetadataSnapshot snapshotFiles,
41+
int totalTranslogOps, long globalCheckpoint) {
4242
this.recoveryId = recoveryId;
4343
this.shardId = shardId;
4444
this.snapshotFiles = snapshotFiles;
4545
this.totalTranslogOps = totalTranslogOps;
46+
this.globalCheckpoint = globalCheckpoint;
4647
}
4748

48-
public long recoveryId() {
49-
return this.recoveryId;
50-
}
51-
52-
public ShardId shardId() {
53-
return shardId;
54-
}
55-
56-
@Override
57-
public void readFrom(StreamInput in) throws IOException {
58-
super.readFrom(in);
49+
RecoveryCleanFilesRequest(StreamInput in) throws IOException {
50+
super(in);
5951
recoveryId = in.readLong();
6052
shardId = ShardId.readShardId(in);
6153
snapshotFiles = new Store.MetadataSnapshot(in);
6254
totalTranslogOps = in.readVInt();
55+
if (in.getVersion().onOrAfter(Version.V_7_1_0)) {
56+
globalCheckpoint = in.readZLong();
57+
} else {
58+
globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
59+
}
6360
}
6461

6562
@Override
@@ -69,13 +66,28 @@ public void writeTo(StreamOutput out) throws IOException {
6966
shardId.writeTo(out);
7067
snapshotFiles.writeTo(out);
7168
out.writeVInt(totalTranslogOps);
69+
if (out.getVersion().onOrAfter(Version.V_7_1_0)) {
70+
out.writeZLong(globalCheckpoint);
71+
}
7272
}
7373

7474
public Store.MetadataSnapshot sourceMetaSnapshot() {
7575
return snapshotFiles;
7676
}
7777

78+
public long recoveryId() {
79+
return this.recoveryId;
80+
}
81+
82+
public ShardId shardId() {
83+
return shardId;
84+
}
85+
7886
public int totalTranslogOps() {
7987
return totalTranslogOps;
8088
}
89+
90+
public long getGlobalCheckpoint() {
91+
return globalCheckpoint;
92+
}
8193
}

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
177177
startingSeqNo = 0;
178178
try {
179179
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
180-
sendFileResult = phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps);
180+
sendFileResult = phase1(phase1Snapshot.getIndexCommit(), shard.getGlobalCheckpoint(), () -> estimateNumOps);
181181
} catch (final Exception e) {
182182
throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
183183
} finally {
@@ -332,7 +332,7 @@ static final class SendFileResult {
332332
* segments that are missing. Only segments that have the same size and
333333
* checksum can be reused
334334
*/
335-
public SendFileResult phase1(final IndexCommit snapshot, final Supplier<Integer> translogOps) {
335+
public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckpoint, final Supplier<Integer> translogOps) {
336336
cancellableThreads.checkForCancel();
337337
// Total size of segment files that are recovered
338338
long totalSize = 0;
@@ -422,7 +422,7 @@ public SendFileResult phase1(final IndexCommit snapshot, final Supplier<Integer>
422422
// are deleted
423423
try {
424424
cancellableThreads.executeIO(() ->
425-
recoveryTarget.cleanFiles(translogOps.get(), recoverySourceMetadata));
425+
recoveryTarget.cleanFiles(translogOps.get(), globalCheckpoint, recoverySourceMetadata));
426426
} catch (RemoteTransportException | IOException targetException) {
427427
final IOException corruptIndexException;
428428
// we realized that after the index was copied and we wanted to finalize the recovery

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,9 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra
288288
ActionListener.completeWith(listener, () -> {
289289
state().getTranslog().totalOperations(totalTranslogOps);
290290
indexShard().openEngineAndSkipTranslogRecovery();
291+
assert indexShard.getGlobalCheckpoint() >= indexShard.seqNoStats().getMaxSeqNo() ||
292+
indexShard.indexSettings().getIndexVersionCreated().before(Version.V_7_1_0)
293+
: "global checkpoint is not initialized [" + indexShard.seqNoStats() + "]";
291294
return null;
292295
});
293296
}
@@ -382,7 +385,7 @@ public void receiveFileInfo(List<String> phase1FileNames,
382385
}
383386

384387
@Override
385-
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
388+
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
386389
state().getTranslog().totalOperations(totalTranslogOps);
387390
// first, we go and move files that were created with the recovery id suffix to
388391
// the actual names, its ok if we have a corrupted index here, since we have replicas
@@ -395,10 +398,11 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa
395398
if (indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) {
396399
store.ensureIndexHasHistoryUUID();
397400
}
398-
// TODO: Assign the global checkpoint to the max_seqno of the safe commit if the index version >= 6.2
401+
assert globalCheckpoint >= Long.parseLong(sourceMetaData.getCommitUserData().get(SequenceNumbers.MAX_SEQ_NO))
402+
|| indexShard.indexSettings().getIndexVersionCreated().before(Version.V_7_1_0) :
403+
"invalid global checkpoint[" + globalCheckpoint + "] source_meta_data [" + sourceMetaData.getCommitUserData() + "]";
399404
final String translogUUID = Translog.createEmptyTranslog(
400-
indexShard.shardPath().resolveTranslog(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId,
401-
indexShard.getPendingPrimaryTerm());
405+
indexShard.shardPath().resolveTranslog(), globalCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
402406
store.associateIndexWithNewTranslog(translogUUID);
403407

404408
if (indexShard.getRetentionLeases().leases().isEmpty()) {

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,12 @@ void receiveFileInfo(List<String> phase1FileNames,
8888
/**
8989
* After all source files has been sent over, this command is sent to the target so it can clean any local
9090
* files that are not part of the source store
91+
*
9192
* @param totalTranslogOps an update number of translog operations that will be replayed later on
92-
* @param sourceMetaData meta data of the source store
93+
* @param globalCheckpoint the global checkpoint on the primary
94+
* @param sourceMetaData meta data of the source store
9395
*/
94-
void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException;
96+
void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException;
9597

9698
/** writes a partial file chunk to the target store */
9799
void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,

server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,9 @@ public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileS
139139
}
140140

141141
@Override
142-
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
142+
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
143143
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.CLEAN_FILES,
144-
new RecoveryCleanFilesRequest(recoveryId, shardId, sourceMetaData, totalTranslogOps),
144+
new RecoveryCleanFilesRequest(recoveryId, shardId, sourceMetaData, totalTranslogOps, globalCheckpoint),
145145
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
146146
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
147147
}

server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,9 @@ public void run() {
121121
Future<Void> future = shards.asyncRecoverReplica(replica,
122122
(indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener) {
123123
@Override
124-
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
125-
super.cleanFiles(totalTranslogOps, sourceMetaData);
124+
public void cleanFiles(int totalTranslogOps, long globalCheckpoint,
125+
Store.MetadataSnapshot sourceMetaData) throws IOException {
126+
super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData);
126127
latch.countDown();
127128
try {
128129
latch.await();

server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -853,9 +853,9 @@ public void indexTranslogOperations(
853853
}
854854

855855
@Override
856-
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
856+
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
857857
blockIfNeeded(RecoveryState.Stage.INDEX);
858-
super.cleanFiles(totalTranslogOps, sourceMetaData);
858+
super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData);
859859
}
860860

861861
@Override

server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ public void testWriteFileChunksConcurrently() throws Exception {
189189
for (Thread sender : senders) {
190190
sender.join();
191191
}
192-
recoveryTarget.cleanFiles(0, sourceSnapshot);
192+
recoveryTarget.cleanFiles(0, Long.parseLong(sourceSnapshot.getCommitUserData().get(SequenceNumbers.MAX_SEQ_NO)), sourceSnapshot);
193193
recoveryTarget.decRef();
194194
Store.MetadataSnapshot targetSnapshot = targetShard.snapshotStoreMetadata();
195195
Store.RecoveryDiff diff = sourceSnapshot.recoveryDiff(targetSnapshot);

0 commit comments

Comments
 (0)