Skip to content

Make peer recovery work with archive data #81522

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Dec 14, 2021
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,13 @@

import static org.elasticsearch.common.lucene.Lucene.indexWriterConfigWithNoMerging;
import static org.elasticsearch.core.TimeValue.timeValueMillis;
import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.isSearchableSnapshotStore;

/**
* This package private utility class encapsulates the logic to recover an index shard from either an existing index on
* disk or from a snapshot in a repository.
*/
final class StoreRecovery {
public final class StoreRecovery {

private final Logger logger;
private final ShardId shardId;
Expand Down Expand Up @@ -549,14 +550,17 @@ private void restore(
}
}

private void bootstrap(final IndexShard indexShard, final Store store) throws IOException {
store.bootstrapNewHistory();
public static void bootstrap(final IndexShard indexShard, final Store store) throws IOException {
if (isSearchableSnapshotStore(indexShard.indexSettings().getSettings()) == false) {
// not bootstrapping new history for searchable snapshots (which are read-only) allows sequence-number based peer recoveries
store.bootstrapNewHistory();
}
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(),
localCheckpoint,
shardId,
indexShard.shardId(),
indexShard.getPendingPrimaryTerm()
);
store.associateIndexWithNewTranslog(translogUUID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardLongFieldRange;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.shard.StoreRecovery;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogCorruptedException;
Expand All @@ -65,6 +66,7 @@

import static org.elasticsearch.core.TimeValue.timeValueMillis;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.isSearchableSnapshotStore;

/**
* The recovery target handles recoveries of peer shards of the shard+node to recover to.
Expand Down Expand Up @@ -226,6 +228,17 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
indexShard.prepareForIndexRecovery();
if (isSearchableSnapshotStore(indexShard.indexSettings().getSettings())) {
// for searchable snapshots, peer recovery is treated similarly to recovery from snapshot
indexShard.getIndexEventListener().afterFilesRestoredFromRepository(indexShard);
final Store store = indexShard.store();
store.incRef();
try {
StoreRecovery.bootstrap(indexShard, store);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this new approach, we now have sequence-number based recoveries for searchable snapshots, which means that there's no need any longer to send over any file (i.e. the exception of ^recovery\..*\.segments_.*$ in InMemoryNoOpCommitDirectory.ensureMutable) except for the BWC case (when primary was on older node and created a different history uuid).

} finally {
store.decRef();
}
}
final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint();
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG
: "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2674,30 +2674,23 @@ public void snapshotShard(SnapshotShardContext context) {
long indexIncrementalSize = 0;
long indexTotalFileSize = 0;
final BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new LinkedBlockingQueue<>();
// If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files
// in the commit with files already in the repository
if (filesFromSegmentInfos == null) {

if (isSearchableSnapshotStore(store.indexSettings().getSettings())) {
indexCommitPointFiles = Collections.emptyList();
} else if (filesFromSegmentInfos == null) {
// If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files
// in the commit with files already in the repository
indexCommitPointFiles = new ArrayList<>();
final Collection<String> fileNames;
final Store.MetadataSnapshot metadataFromStore;
if (isSearchableSnapshotStore(store.indexSettings().getSettings())) {
fileNames = Collections.emptyList();
metadataFromStore = Store.MetadataSnapshot.EMPTY;
} else {
try (Releasable ignored = incrementStoreRef(store, snapshotStatus, shardId)) {
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
try {
logger.trace(
"[{}] [{}] Loading store metadata using index commit [{}]",
shardId,
snapshotId,
snapshotIndexCommit
);
metadataFromStore = store.getMetadata(snapshotIndexCommit);
fileNames = snapshotIndexCommit.getFileNames();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
}
try (Releasable ignored = incrementStoreRef(store, snapshotStatus, shardId)) {
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
try {
logger.trace("[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit);
metadataFromStore = store.getMetadata(snapshotIndexCommit);
fileNames = snapshotIndexCommit.getFileNames();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
}
}
for (String fileName : fileNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.lucene.index.Terms;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.xpack.lucene.bwc.codecs.lucene70.BWCLucene70Codec;

import java.io.IOException;
Expand Down Expand Up @@ -169,6 +170,10 @@ public void write(Directory directory, SegmentInfo segmentInfo, String segmentSu
private static FieldInfos filterFields(FieldInfos fieldInfos) {
List<FieldInfo> fieldInfoCopy = new ArrayList<>(fieldInfos.size());
for (FieldInfo fieldInfo : fieldInfos) {
// omit sequence number field so that it doesn't interfere with peer recovery
if (fieldInfo.name.equals(SeqNoFieldMapper.NAME)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this, LuceneChangesSnapshot will just return empty set of documents instead of failing because the field has docvalues mapped to none now.

In a follow-up, I want to explore exposing doc-values of older indices (in particular _seq_no field and soft-deletes)

continue;
}
fieldInfoCopy.add(
new FieldInfo(
fieldInfo.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
Expand All @@ -18,20 +17,15 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.searchablesnapshots.cache.full.CacheService;
import org.elasticsearch.xpack.searchablesnapshots.cache.shared.FrozenCacheService;
import org.elasticsearch.xpack.searchablesnapshots.store.SearchableSnapshotDirectory;

import java.nio.file.Path;

import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.isSearchableSnapshotStore;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
Expand Down Expand Up @@ -65,7 +59,6 @@ public SearchableSnapshotIndexEventListener(
public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) {
assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC);
ensureSnapshotIsLoaded(indexShard);
associateNewEmptyTranslogWithIndex(indexShard);
}

private static void ensureSnapshotIsLoaded(IndexShard indexShard) {
Expand Down Expand Up @@ -93,26 +86,6 @@ private static void ensureSnapshotIsLoaded(IndexShard indexShard) {
: "loading snapshot must not be called twice unless we are retrying a peer recovery";
}

private static void associateNewEmptyTranslogWithIndex(IndexShard indexShard) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is now gone. Woop woop

final ShardId shardId = indexShard.shardId();
assert isSearchableSnapshotStore(indexShard.indexSettings().getSettings()) : "Expected a searchable snapshot shard " + shardId;
if (indexShard.routingEntry().primary()
&& indexShard.routingEntry().recoverySource().getType().equals(RecoverySource.Type.SNAPSHOT)) {
// translog initialization is done later in the restore step
return;
}
try {
final SegmentInfos segmentInfos = indexShard.store().readLastCommittedSegmentsInfo();
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final long primaryTerm = indexShard.getPendingPrimaryTerm();
final String translogUUID = segmentInfos.userData.get(Translog.TRANSLOG_UUID_KEY);
final Path translogLocation = indexShard.shardPath().resolveTranslog();
Translog.createEmptyTranslog(translogLocation, shardId, localCheckpoint, primaryTerm, translogUUID, null);
} catch (Exception e) {
throw new TranslogException(shardId, "failed to associate a new translog", e);
}
}

@Override
public void beforeIndexRemoved(IndexService indexService, IndexRemovalReason reason) {
if (shouldEvictCacheFiles(reason)) {
Expand Down
1 change: 1 addition & 0 deletions x-pack/qa/repository-old-versions/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ if (Os.isFamily(Os.FAMILY_WINDOWS)) {

def testClusterProvider = testClusters.register(clusterName) {
testDistribution = 'DEFAULT'
numberOfNodes = 2

setting 'path.repo', repoLocation
setting 'xpack.license.self_generated.type', 'trial'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.http.HttpHost;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
Expand All @@ -26,6 +27,8 @@
import org.elasticsearch.client.indices.CloseIndexRequest;
import org.elasticsearch.client.searchable_snapshots.MountSnapshotRequest;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
Expand Down Expand Up @@ -211,6 +214,16 @@ private void restoreMountAndVerify(int numDocs, Set<String> expectedIds, RestHig
assertEquals(numberOfShards, restoreSnapshotResponse.getRestoreInfo().totalShards());
assertEquals(numberOfShards, restoreSnapshotResponse.getRestoreInfo().successfulShards());

assertEquals(
ClusterHealthStatus.GREEN,
client.cluster()
.health(
new ClusterHealthRequest("restored_test").waitForGreenStatus().waitForNoRelocatingShards(true),
RequestOptions.DEFAULT
)
.getStatus()
);

// run a search against the index
assertDocs("restored_test", numDocs, expectedIds, client);

Expand All @@ -219,13 +232,24 @@ private void restoreMountAndVerify(int numDocs, Set<String> expectedIds, RestHig
.mountSnapshot(
new MountSnapshotRequest("testrepo", "snap1", "test").storage(MountSnapshotRequest.Storage.FULL_COPY)
.renamedIndex("mounted_full_copy_test")
.indexSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build())
.waitForCompletion(true),
RequestOptions.DEFAULT
);
assertNotNull(mountSnapshotResponse.getRestoreInfo());
assertEquals(numberOfShards, mountSnapshotResponse.getRestoreInfo().totalShards());
assertEquals(numberOfShards, mountSnapshotResponse.getRestoreInfo().successfulShards());

assertEquals(
ClusterHealthStatus.GREEN,
client.cluster()
.health(
new ClusterHealthRequest("mounted_full_copy_test").waitForGreenStatus().waitForNoRelocatingShards(true),
RequestOptions.DEFAULT
)
.getStatus()
);

// run a search against the index
assertDocs("mounted_full_copy_test", numDocs, expectedIds, client);

Expand Down