-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Replay history of operations in remote recovery #39153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6bae9cd
620368c
702ab2b
a8b516d
583af89
61fd092
11e5907
e3e9a18
e2682ac
f3ef90d
afddbe3
dfabbd1
7d40d33
838bf1a
1abc767
384f626
6760d4e
234b981
91201d0
455b63b
ec082f8
18f4b76
f867c17
a834af4
f1a077c
fc94128
dccbed5
f48126c
4f5b1a5
e8e8b60
04a7976
a59edc0
c3dab87
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,7 @@ | |
import org.apache.lucene.store.IndexInput; | ||
import org.elasticsearch.ExceptionsHelper; | ||
import org.elasticsearch.Version; | ||
import org.elasticsearch.action.admin.indices.flush.FlushRequest; | ||
import org.elasticsearch.cluster.metadata.IndexMetaData; | ||
import org.elasticsearch.cluster.metadata.MappingMetaData; | ||
import org.elasticsearch.cluster.routing.RecoverySource; | ||
|
@@ -43,6 +44,7 @@ | |
import org.elasticsearch.index.engine.Engine; | ||
import org.elasticsearch.index.engine.EngineException; | ||
import org.elasticsearch.index.mapper.MapperService; | ||
import org.elasticsearch.index.seqno.SeqNoStats; | ||
import org.elasticsearch.index.seqno.SequenceNumbers; | ||
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; | ||
import org.elasticsearch.index.store.Store; | ||
|
@@ -459,6 +461,7 @@ private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState | |
* Restores shard from {@link SnapshotRecoverySource} associated with this shard in routing table | ||
*/ | ||
private void restore(final IndexShard indexShard, final Repository repository, final SnapshotRecoverySource restoreSource) { | ||
assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; | ||
final RecoveryState.Translog translogState = indexShard.recoveryState().getTranslog(); | ||
if (restoreSource == null) { | ||
throw new IndexShardRestoreFailedException(shardId, "empty restore source"); | ||
|
@@ -467,28 +470,63 @@ private void restore(final IndexShard indexShard, final Repository repository, f | |
logger.trace("[{}] restoring shard [{}]", restoreSource.snapshot(), shardId); | ||
} | ||
try { | ||
translogState.totalOperations(0); | ||
translogState.totalOperationsOnStart(0); | ||
indexShard.prepareForIndexRecovery(); | ||
ShardId snapshotShardId = shardId; | ||
final String indexName = restoreSource.index(); | ||
if (!shardId.getIndexName().equals(indexName)) { | ||
snapshotShardId = new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id()); | ||
} | ||
final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName); | ||
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), | ||
restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState()); | ||
final Store store = indexShard.store(); | ||
store.bootstrapNewHistory(); | ||
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); | ||
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); | ||
final String translogUUID = Translog.createEmptyTranslog( | ||
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm()); | ||
store.associateIndexWithNewTranslog(translogUUID); | ||
assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; | ||
writeEmptyRetentionLeasesFile(indexShard); | ||
indexShard.openEngineAndRecoverFromTranslog(); | ||
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); | ||
try (Translog.Snapshot historySnapshot = repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), | ||
restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState())) { | ||
final Store store = indexShard.store(); | ||
store.bootstrapNewHistory(); | ||
final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit( | ||
store.readLastCommittedSegmentsInfo().userData.entrySet()); | ||
final String translogUUID = Translog.createEmptyTranslog( | ||
indexShard.shardPath().resolveTranslog(), commitInfo.maxSeqNo, shardId, indexShard.getPendingPrimaryTerm()); | ||
store.associateIndexWithNewTranslog(translogUUID); | ||
writeEmptyRetentionLeasesFile(indexShard); | ||
indexShard.openEngineAndSkipTranslogRecovery(); | ||
indexShard.advanceMaxSeqNoOfUpdatesOrDeletes(commitInfo.maxSeqNo); | ||
translogState.totalOperations(historySnapshot.totalOperations()); | ||
translogState.totalOperationsOnStart(historySnapshot.totalOperations()); | ||
Translog.Operation op; | ||
while ((op = historySnapshot.next()) != null) { | ||
final Engine.Result result = indexShard.applyTranslogOperation(op, Engine.Operation.Origin.SNAPSHOT_RECOVERY); | ||
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { | ||
assert false : "mapping is not ready for [" + op + "]"; | ||
throw new IndexShardRestoreFailedException(shardId, "mapping is not ready for [" + op + "]"); | ||
} | ||
assert result.getFailure() == null : "unexpected failure while applying translog operation" + result.getFailure(); | ||
ExceptionsHelper.reThrowIfNotNull(result.getFailure()); | ||
indexShard.sync(); | ||
indexShard.afterWriteOperation(); | ||
translogState.incrementRecoveredOperations(); | ||
} | ||
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); | ||
long syncedGlobalCheckpoint = indexShard.getLastSyncedGlobalCheckpoint(); | ||
final SeqNoStats seqNoStats = indexShard.seqNoStats(); | ||
if (seqNoStats.getMaxSeqNo() != syncedGlobalCheckpoint || seqNoStats.getLocalCheckpoint() != syncedGlobalCheckpoint) { | ||
throw new IndexShardRestoreFailedException(shardId, "history is not fully restored: seq_no_stats=" + seqNoStats | ||
+ " synced_global_checkpoint=" + syncedGlobalCheckpoint); | ||
} | ||
/* | ||
* If there're gaps in the restoring commit, we won't have every translog entry from the local_checkpoint | ||
* to the max_seq_no. Then we won't have a safe commit for the restoring commit is not safe (missing translog). | ||
* To maintain the safe commit assumption, we have to forcefully flush a new commit here. | ||
*/ | ||
if (commitInfo.localCheckpoint != commitInfo.maxSeqNo) { | ||
indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we make sure that the global checkpoint is up-to-date (i.e. >= max-seq-no in the new commit that we create here) before calling this? Otherwise the shard will be marked as in-sync in the cluster state while the commit here will only become safe commit when the shard is locally started (and the gcp advanced). The main property we're after here is that every in-sync shard copy has a safe commit, which (AFAICS) is not guaranteed by the current recovery logic. |
||
} | ||
final SequenceNumbers.CommitInfo restoredCommitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit( | ||
store.readLastCommittedSegmentsInfo().userData.entrySet()); | ||
syncedGlobalCheckpoint = indexShard.getLastSyncedGlobalCheckpoint(); | ||
if (restoredCommitInfo.localCheckpoint != syncedGlobalCheckpoint) { | ||
throw new IndexShardRestoreFailedException(shardId, "history is not fully restored " + | ||
" restored_commit_info=" + restoredCommitInfo + " synced_global_checkpoint=" + syncedGlobalCheckpoint); | ||
} | ||
} | ||
indexShard.finalizeRecovery(); | ||
indexShard.postRecovery("restore done"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we assert in post_recovery that we have a safe commit? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added an assertion here, but this assertion may slow down our tests. I will post the difference. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sadly, we don't have this invariant with closed indices because the global checkpoint is not persisted to the translog checkpoint during recovery. |
||
} catch (Exception e) { | ||
|
Uh oh!
There was an error while loading. Please reload this page.