Skip to content

Replay history of operations in remote recovery #39153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6bae9cd
Replay history in remote recovery
dnhatn Feb 19, 2019
620368c
Merge branch 'master' into remote-recovery-history
dnhatn Feb 20, 2019
702ab2b
add safe commit assertion
dnhatn Feb 20, 2019
a8b516d
only check internal engine
dnhatn Feb 20, 2019
583af89
Merge branch 'master' into remote-recovery-history
dnhatn Feb 20, 2019
61fd092
Merge branch 'master' into remote-recovery-history
dnhatn Feb 20, 2019
11e5907
Merge branch 'master' into remote-recovery-history
dnhatn Feb 21, 2019
e3e9a18
make translog snapshot from ops
dnhatn Feb 21, 2019
e2682ac
Merge branch 'master' into remote-recovery-history
dnhatn Feb 21, 2019
f3ef90d
Flush if history was replayed after restoring snapshot
dnhatn Feb 22, 2019
afddbe3
Merge branch 'master' into remote-recovery-history
dnhatn Feb 22, 2019
dfabbd1
Merge branch 'master' into remote-recovery-history
dnhatn Feb 22, 2019
7d40d33
check only started shards
dnhatn Feb 23, 2019
838bf1a
Merge branch 'master' into remote-recovery-history
dnhatn Feb 26, 2019
1abc767
assert safe commit in post_recovery
dnhatn Feb 26, 2019
384f626
total_on_start
dnhatn Feb 26, 2019
6760d4e
Reuse ShardChangesAction
dnhatn Feb 26, 2019
234b981
canOptimizeAddDocument
dnhatn Feb 27, 2019
91201d0
Merge branch 'master' into remote-recovery-history
dnhatn Mar 4, 2019
455b63b
Merge branch 'master' into remote-recovery-history
dnhatn Mar 11, 2019
ec082f8
bootstrap global checkpoint in phase 1 of peer recovery
dnhatn Mar 11, 2019
18f4b76
stylecheck
dnhatn Mar 12, 2019
f867c17
Merge branch 'master' into remote-recovery-history
dnhatn Mar 12, 2019
a834af4
Merge branch 'master' into remote-recovery-history
dnhatn Mar 13, 2019
f1a077c
adapt change after merge
dnhatn Mar 13, 2019
fc94128
Merge branch 'master' into remote-recovery-history
dnhatn Mar 17, 2019
dccbed5
assertion :D
dnhatn Mar 17, 2019
f48126c
send shard changes requests to any copy
dnhatn Mar 17, 2019
4f5b1a5
Merge branch 'master' into remote-recovery-history
dnhatn Mar 18, 2019
e8e8b60
Merge branch 'master' into remote-recovery-history
dnhatn Mar 25, 2019
04a7976
Merge branch 'master' into remote-recovery-history
dnhatn Apr 2, 2019
a59edc0
Merge branch 'master' into remote-recovery-history
dnhatn Apr 5, 2019
c3dab87
Merge branch 'master' into remote-recovery-history
dnhatn Apr 5, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1321,10 +1321,11 @@ public enum Origin {
REPLICA,
PEER_RECOVERY,
LOCAL_TRANSLOG_RECOVERY,
LOCAL_RESET;
LOCAL_RESET,
SNAPSHOT_RECOVERY;

public boolean isRecovery() {
return this == PEER_RECOVERY || this == LOCAL_TRANSLOG_RECOVERY;
return this == PEER_RECOVERY || this == LOCAL_TRANSLOG_RECOVERY || this == SNAPSHOT_RECOVERY;
}

boolean isFromTranslog() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,7 @@ private boolean canOptimizeAddDocument(Index index) {
return true;
case LOCAL_TRANSLOG_RECOVERY:
case LOCAL_RESET:
case SNAPSHOT_RECOVERY:
assert index.isRetry();
return true; // allow to optimize in order to update the max safe time stamp
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*/
public final class MissingHistoryOperationsException extends IllegalStateException {

MissingHistoryOperationsException(String message) {
public MissingHistoryOperationsException(String message) {
super(message);
}
}
31 changes: 31 additions & 0 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Term;
Expand Down Expand Up @@ -78,6 +79,7 @@
import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache;
import org.elasticsearch.index.cache.request.ShardRequestCache;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.CombinedDeletionPolicy;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.Engine.GetResult;
Expand Down Expand Up @@ -108,6 +110,7 @@
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchStats;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.RetentionLeaseStats;
Expand Down Expand Up @@ -1263,6 +1266,7 @@ public IndexShard postRecovery(String reason)
if (state == IndexShardState.STARTED) {
throw new IndexShardStartedException(shardId);
}
assert assertSafeCommitExists();
// we need to refresh again to expose all operations that were index until now. Otherwise
// we may not expose operations that were indexed with a refresh listener that was immediately
// responded to in addRefreshListener.
Expand Down Expand Up @@ -1485,6 +1489,33 @@ private boolean assertMaxUnsafeAutoIdInCommit() throws IOException {
return true;
}

private boolean assertSafeCommitExists() {
// we don't set the global checkpoint in peer recovery before 8.0
if (indexSettings.getIndexVersionCreated().before(Version.V_8_0_0) && getLastSyncedGlobalCheckpoint() == UNASSIGNED_SEQ_NO) {
return true;
}
try (Closeable ignored = acquireRetentionLock()) {
final long globalCheckpoint = getLastSyncedGlobalCheckpoint();
List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commits, globalCheckpoint);
SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(safeCommit.getUserData().entrySet());
assert commitInfo.maxSeqNo <= globalCheckpoint :
routingEntry() + " safe_commit[" + commitInfo + "] global_checkpoint [" + globalCheckpoint + "]";
LocalCheckpointTracker checkpointTracker = new LocalCheckpointTracker(commitInfo.maxSeqNo, commitInfo.localCheckpoint);
try (Translog.Snapshot historySnapshot = getHistoryOperations("assert_safe_commit_exists", commitInfo.localCheckpoint + 1)) {
Translog.Operation op;
while ((op = historySnapshot.next()) != null) {
checkpointTracker.markSeqNoAsCompleted(op.seqNo());
}
}
assert checkpointTracker.getCheckpoint() >= globalCheckpoint : routingEntry() + " safe_commit[" + commitInfo + "]"
+ " global_checkpoint[" + globalCheckpoint + "] recovered_checkpoint [" + checkpointTracker.getCheckpoint() + "]";
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return true;
}

protected void onNewEngine(Engine newEngine) {
refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
Expand All @@ -43,6 +44,7 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -459,6 +461,7 @@ private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState
* Restores shard from {@link SnapshotRecoverySource} associated with this shard in routing table
*/
private void restore(final IndexShard indexShard, final Repository repository, final SnapshotRecoverySource restoreSource) {
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
final RecoveryState.Translog translogState = indexShard.recoveryState().getTranslog();
if (restoreSource == null) {
throw new IndexShardRestoreFailedException(shardId, "empty restore source");
Expand All @@ -467,28 +470,63 @@ private void restore(final IndexShard indexShard, final Repository repository, f
logger.trace("[{}] restoring shard [{}]", restoreSource.snapshot(), shardId);
}
try {
translogState.totalOperations(0);
translogState.totalOperationsOnStart(0);
indexShard.prepareForIndexRecovery();
ShardId snapshotShardId = shardId;
final String indexName = restoreSource.index();
if (!shardId.getIndexName().equals(indexName)) {
snapshotShardId = new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
}
final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(),
restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState());
final Store store = indexShard.store();
store.bootstrapNewHistory();
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
writeEmptyRetentionLeasesFile(indexShard);
indexShard.openEngineAndRecoverFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
try (Translog.Snapshot historySnapshot = repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(),
restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState())) {
final Store store = indexShard.store();
store.bootstrapNewHistory();
final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(
store.readLastCommittedSegmentsInfo().userData.entrySet());
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), commitInfo.maxSeqNo, shardId, indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
writeEmptyRetentionLeasesFile(indexShard);
indexShard.openEngineAndSkipTranslogRecovery();
indexShard.advanceMaxSeqNoOfUpdatesOrDeletes(commitInfo.maxSeqNo);
translogState.totalOperations(historySnapshot.totalOperations());
translogState.totalOperationsOnStart(historySnapshot.totalOperations());
Translog.Operation op;
while ((op = historySnapshot.next()) != null) {
final Engine.Result result = indexShard.applyTranslogOperation(op, Engine.Operation.Origin.SNAPSHOT_RECOVERY);
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
assert false : "mapping is not ready for [" + op + "]";
throw new IndexShardRestoreFailedException(shardId, "mapping is not ready for [" + op + "]");
}
assert result.getFailure() == null : "unexpected failure while applying translog operation" + result.getFailure();
ExceptionsHelper.reThrowIfNotNull(result.getFailure());
indexShard.sync();
indexShard.afterWriteOperation();
translogState.incrementRecoveredOperations();
}
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
long syncedGlobalCheckpoint = indexShard.getLastSyncedGlobalCheckpoint();
final SeqNoStats seqNoStats = indexShard.seqNoStats();
if (seqNoStats.getMaxSeqNo() != syncedGlobalCheckpoint || seqNoStats.getLocalCheckpoint() != syncedGlobalCheckpoint) {
throw new IndexShardRestoreFailedException(shardId, "history is not fully restored: seq_no_stats=" + seqNoStats
+ " synced_global_checkpoint=" + syncedGlobalCheckpoint);
}
/*
* If there're gaps in the restoring commit, we won't have every translog entry from the local_checkpoint
* to the max_seq_no. Then we won't have a safe commit for the restoring commit is not safe (missing translog).
* To maintain the safe commit assumption, we have to forcefully flush a new commit here.
*/
if (commitInfo.localCheckpoint != commitInfo.maxSeqNo) {
indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we make sure that the global checkpoint is up-to-date (i.e. >= max-seq-no in the new commit that we create here) before calling this? Otherwise the shard will be marked as in-sync in the cluster state while the commit here will only become safe commit when the shard is locally started (and the gcp advanced). The main property we're after here is that every in-sync shard copy has a safe commit, which (AFAICS) is not guaranteed by the current recovery logic.

}
final SequenceNumbers.CommitInfo restoredCommitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(
store.readLastCommittedSegmentsInfo().userData.entrySet());
syncedGlobalCheckpoint = indexShard.getLastSyncedGlobalCheckpoint();
if (restoredCommitInfo.localCheckpoint != syncedGlobalCheckpoint) {
throw new IndexShardRestoreFailedException(shardId, "history is not fully restored " +
" restored_commit_info=" + restoredCommitInfo + " synced_global_checkpoint=" + syncedGlobalCheckpoint);
}
}
indexShard.finalizeRecovery();
indexShard.postRecovery("restore done");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we assert in post_recovery that we have a safe commit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added an assertion here, but this assertion may slow down our tests. I will post the difference.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly, we don't have this invariant with closed indices because the global checkpoint is not persisted to the translog checkpoint during recovery.

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,23 @@ default int skippedOperations() {
* Returns the next operation in the snapshot or <code>null</code> if we reached the end.
*/
Translog.Operation next() throws IOException;

Snapshot EMPTY = new Translog.Snapshot() {
@Override
public int totalOperations() {
return 0;
}

@Override
public Operation next() {
return null;
}

@Override
public void close() {

}
};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
Expand Down Expand Up @@ -125,9 +126,9 @@ public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId,
}

@Override
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
in.restoreShard(shard, snapshotId, version, indexId, snapshotShardId, recoveryState);
public Translog.Snapshot restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId,
ShardId snapshotShardId, RecoveryState recoveryState) {
return in.restoreShard(shard, snapshotId, version, indexId, snapshotShardId, recoveryState);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
Expand Down Expand Up @@ -215,9 +216,11 @@ void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId
* @param indexId id of the index in the repository from which the restore is occurring
* @param snapshotShardId shard id (in the snapshot)
* @param recoveryState recovery state
* @return a snapshot of history operation will be replayed into the restoring shard.
* Returns {@link Translog.Snapshot#EMPTY} if there is no operation to be replayed.
*/
void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId,
ShardId snapshotShardId, RecoveryState recoveryState);
Translog.Snapshot restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId,
ShardId snapshotShardId, RecoveryState recoveryState);

/**
* Retrieve shard snapshot status for the stored snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
Expand Down Expand Up @@ -850,8 +851,8 @@ public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId,
}

@Override
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
public Translog.Snapshot restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId,
ShardId snapshotShardId, RecoveryState recoveryState) {
final Context context = new Context(snapshotId, indexId, shard.shardId(), snapshotShardId);
BlobPath path = basePath().add("indices").add(indexId.getId()).add(Integer.toString(snapshotShardId.getId()));
BlobContainer blobContainer = blobStore().blobContainer(path);
Expand All @@ -860,6 +861,7 @@ public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version versio
BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot();
SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
snapshotContext.restore(snapshotFiles);
return Translog.Snapshot.EMPTY;
} catch (Exception e) {
throw new IndexShardRestoreFailedException(shard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2209,8 +2209,8 @@ public void testRestoreShard() throws IOException {
target.markAsRecovering("store", new RecoveryState(routing, localNode, null));
assertTrue(target.restoreFromRepository(new RestoreOnlyRepository("test") {
@Override
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
public Translog.Snapshot restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId,
ShardId snapshotShardId, RecoveryState recoveryState) {
try {
cleanLuceneIndex(targetStore.directory());
for (String file : sourceStore.directory().listAll()) {
Expand All @@ -2219,21 +2219,22 @@ public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version versio
}
targetStore.directory().copyFrom(sourceStore.directory(), file, file, IOContext.DEFAULT);
}
return Translog.Snapshot.EMPTY;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}));
assertThat(target.getLocalCheckpoint(), equalTo(2L));
assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L));
assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L));
assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(2L));
IndexShardTestCase.updateRoutingEntry(target, routing.moveToStarted());
assertThat(target.getReplicationTracker().getTrackedLocalCheckpointForShard(
target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(2L));
assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(2L));

assertDocs(target, "0", "2");

IndexShardTestCase.assertSafeCommitExists(target);
closeShard(source, false);
closeShards(target);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.After;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -42,6 +43,11 @@
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
public class IndexPrimaryRelocationIT extends ESIntegTestCase {

@After
void assertSafeCommitExists() throws Exception {
internalCluster().assertSafeCommitExists();
}

private static final int RELOCATION_COUNT = 15;

@TestLogging("_root:DEBUG,org.elasticsearch.action.bulk:TRACE,org.elasticsearch.index.shard:TRACE," +
Expand Down
Loading