Skip to content

Add recovery infrastructure hook to work with older Lucene indices #81056

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8e1e74e
Use searchable snapshots with old repos
ywelsch Nov 22, 2021
16787b0
add recovery hook
ywelsch Nov 23, 2021
5eb1652
Basic infra for restoring old snaps
ywelsch Nov 24, 2021
0573748
Merge remote-tracking branch 'elastic/master' into search-snap-old-repos
ywelsch Nov 24, 2021
34e3249
oops
ywelsch Nov 25, 2021
2b81012
cjbjvø≈
ywelsch Nov 25, 2021
8bcfd6c
jgjh
ywelsch Nov 25, 2021
74209b5
it's nullable
ywelsch Nov 25, 2021
0a2d9c7
some reverting
ywelsch Nov 25, 2021
01fb461
renames
ywelsch Nov 25, 2021
72fd881
asdafafa
ywelsch Nov 25, 2021
e9b8039
fixo
ywelsch Nov 25, 2021
63f6fec
asdaf
ywelsch Nov 25, 2021
38537c0
Merge remote-tracking branch 'elastic/master' into search-snap-old-repos
ywelsch Nov 25, 2021
737421a
asda
ywelsch Nov 25, 2021
67eb0d1
fix test
ywelsch Nov 25, 2021
f9ef94b
fix test
ywelsch Nov 29, 2021
b3e91e6
Merge remote-tracking branch 'elastic/master' into search-snap-old-repos
ywelsch Nov 29, 2021
43ab5e3
moar test fix
ywelsch Nov 29, 2021
01debd3
no further split packages
ywelsch Nov 29, 2021
1680afa
disallow usage of "allow_bwc_indices" repo setting in release buildso
ywelsch Nov 29, 2021
f6b4450
oh no, a newline
ywelsch Nov 29, 2021
3357ec3
Tanguy's review feedbacko
ywelsch Nov 30, 2021
a134574
Merge remote-tracking branch 'elastic/master' into search-snap-old-repos
ywelsch Nov 30, 2021
833e2e0
quality checks as usual, spotless
ywelsch Nov 30, 2021
697b0f0
warnings...
ywelsch Nov 30, 2021
1878a0d
...
ywelsch Nov 30, 2021
13dcf38
Check source and simple runtime fields query
ywelsch Nov 30, 2021
bab959e
review comments
ywelsch Dec 2, 2021
8132cee
Merge remote-tracking branch 'elastic/master' into search-snap-old-repos
ywelsch Dec 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1826,6 +1826,23 @@ public static IndexMetadata legacyFromXContent(XContentParser parser) throws IOE
} else if ("mappings".equals(currentFieldName)) {
// don't try to parse these for now
parser.skipChildren();
} else if ("in_sync_allocations".equals(currentFieldName)) {
Copy link
Contributor Author

@ywelsch ywelsch Nov 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just copy paste from the regular IndexMetadata parsing code. in_sync_allocations are required to exist in IndexMetadata for a restore to go through.

while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_ARRAY) {
String shardId = currentFieldName;
Set<String> allocationIds = new HashSet<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.VALUE_STRING) {
allocationIds.add(parser.text());
}
}
builder.putInSyncAllocationIds(Integer.valueOf(shardId), allocationIds);
} else {
throw new IllegalArgumentException("Unexpected token: " + token);
}
}
} else {
// assume it's custom index metadata
parser.skipChildren();
Expand Down Expand Up @@ -1857,6 +1874,8 @@ public static IndexMetadata legacyFromXContent(XContentParser parser) throws IOE
}
XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.nextToken(), parser);

builder.putMapping(MappingMetadata.EMPTY_MAPPINGS); // just make sure it's not empty so that _source can be read

IndexMetadata indexMetadata = builder.build();
assert indexMetadata.getCreationVersion().before(Version.CURRENT.minimumIndexCompatibilityVersion());
return indexMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,19 @@ public void beforeIndexShardRecovery(final IndexShard indexShard, final IndexSet
}
}
}

@Override
public void afterFilesRestoredFromRepository(IndexShard indexShard) {
for (IndexEventListener listener : listeners) {
try {
listener.afterFilesRestoredFromRepository(indexShard);
} catch (Exception e) {
logger.warn(
() -> new ParameterizedMessage("[{}] failed to invoke after files restored from repository", indexShard.shardId()),
e
);
throw e;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,10 @@ default void onStoreClosed(ShardId shardId) {}
* @param indexSettings the shard's index settings
*/
default void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) {}

/**
* Called after the raw files have been restored from the repository but any other recovery processing has happened
* @param indexShard the shard that is recovering
*/
default void afterFilesRestoredFromRepository(IndexShard indexShard) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ private void restore(
logger.trace("[{}] restoring shard [{}]", restoreSource.snapshot(), shardId);
}
final ActionListener<Void> restoreListener = ActionListener.wrap(v -> {
indexShard.getIndexEventListener().afterFilesRestoredFromRepository(indexShard);
final Store store = indexShard.store();
bootstrap(indexShard, store);
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ interface SnapshotCommitSupplier {
* @return a collection of snapshot commit suppliers, keyed by the value of
* {@link org.elasticsearch.index.IndexModule#INDEX_STORE_TYPE_SETTING}.
*/
// TODO: remove unused API extension point
default Map<String, SnapshotCommitSupplier> getSnapshotCommitSuppliers() {
return Collections.emptyMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
import java.util.stream.Stream;

import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.isSearchableSnapshotStore;

/**
* BlobStore - based implementation of Snapshot Repository
Expand Down Expand Up @@ -2679,14 +2680,24 @@ public void snapshotShard(SnapshotShardContext context) {
indexCommitPointFiles = new ArrayList<>();
final Collection<String> fileNames;
final Store.MetadataSnapshot metadataFromStore;
try (Releasable ignored = incrementStoreRef(store, snapshotStatus, shardId)) {
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
try {
logger.trace("[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit);
metadataFromStore = store.getMetadata(snapshotIndexCommit);
fileNames = snapshotIndexCommit.getFileNames();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
if (isSearchableSnapshotStore(store.indexSettings().getSettings())) {
fileNames = Collections.emptyList();
metadataFromStore = Store.MetadataSnapshot.EMPTY;
} else {
try (Releasable ignored = incrementStoreRef(store, snapshotStatus, shardId)) {
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
try {
logger.trace(
"[{}] [{}] Loading store metadata using index commit [{}]",
shardId,
snapshotId,
snapshotIndexCommit
);
metadataFromStore = store.getMetadata(snapshotIndexCommit);
fileNames = snapshotIndexCommit.getFileNames();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
}
}
}
for (String fileName : fileNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
Expand Down Expand Up @@ -69,64 +68,71 @@ protected FileRestoreContext(String repositoryName, ShardId shardId, SnapshotId
public void restore(SnapshotFiles snapshotFiles, Store store, ActionListener<Void> listener) {
store.incRef();
try {
logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId);
Store.MetadataSnapshot recoveryTargetMetadata;
try {
// this will throw an IOException if the store has no segments infos file. The
// store can still have existing files but they will be deleted just before being
// restored.
recoveryTargetMetadata = store.getMetadata(null, true);
} catch (org.apache.lucene.index.IndexNotFoundException e) {
// happens when restore to an empty shard, not a big deal
logger.trace("[{}] [{}] restoring from to an empty shard", shardId, snapshotId);
recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY;
} catch (IOException e) {
logger.warn(
new ParameterizedMessage(
"[{}] [{}] Can't read metadata from store, will not reuse local files during restore",
shardId,
snapshotId
),
e
);
recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY;
}
final List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover = new ArrayList<>();
final Map<String, StoreFileMetadata> snapshotMetadata = new HashMap<>();
final Map<String, BlobStoreIndexShardSnapshot.FileInfo> fileInfos = new HashMap<>();
for (final BlobStoreIndexShardSnapshot.FileInfo fileInfo : snapshotFiles.indexFiles()) {
snapshotMetadata.put(fileInfo.metadata().name(), fileInfo.metadata());
fileInfos.put(fileInfo.metadata().name(), fileInfo);
}
if (isSearchableSnapshotStore(store.indexSettings().getSettings())) {
for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : snapshotFiles.indexFiles()) {
assert store.directory().fileLength(fileInfo.physicalName()) == fileInfo.length();
recoveryState.getIndex().addFileDetail(fileInfo.physicalName(), fileInfo.length(), true);
}
} else {
logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId);
Store.MetadataSnapshot recoveryTargetMetadata;
try {
// this will throw an IOException if the store has no segments infos file. The
// store can still have existing files but they will be deleted just before being
// restored.
recoveryTargetMetadata = store.getMetadata(null, true);
} catch (org.apache.lucene.index.IndexNotFoundException e) {
// happens when restore to an empty shard, not a big deal
logger.trace("[{}] [{}] restoring from to an empty shard", shardId, snapshotId);
recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY;
} catch (IOException e) {
logger.warn(
new ParameterizedMessage(
"[{}] [{}] Can't read metadata from store, will not reuse local files during restore",
shardId,
snapshotId
),
e
);
recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY;
}
final Map<String, StoreFileMetadata> snapshotMetadata = new HashMap<>();
final Map<String, BlobStoreIndexShardSnapshot.FileInfo> fileInfos = new HashMap<>();
for (final BlobStoreIndexShardSnapshot.FileInfo fileInfo : snapshotFiles.indexFiles()) {
snapshotMetadata.put(fileInfo.metadata().name(), fileInfo.metadata());
fileInfos.put(fileInfo.metadata().name(), fileInfo);
}

final Store.MetadataSnapshot sourceMetadata = new Store.MetadataSnapshot(unmodifiableMap(snapshotMetadata), emptyMap(), 0);
final Store.MetadataSnapshot sourceMetadata = new Store.MetadataSnapshot(unmodifiableMap(snapshotMetadata), emptyMap(), 0);

final StoreFileMetadata restoredSegmentsFile = sourceMetadata.getSegmentsFile();
if (restoredSegmentsFile == null) {
throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file");
}
final StoreFileMetadata restoredSegmentsFile = sourceMetadata.getSegmentsFile();
if (restoredSegmentsFile == null) {
throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file");
}

final Store.RecoveryDiff diff = sourceMetadata.recoveryDiff(recoveryTargetMetadata);
for (StoreFileMetadata md : diff.identical) {
BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name());
recoveryState.getIndex().addFileDetail(fileInfo.physicalName(), fileInfo.length(), true);
if (logger.isTraceEnabled()) {
logger.trace(
"[{}] [{}] not_recovering file [{}] from [{}], exists in local store and is same",
shardId,
snapshotId,
fileInfo.physicalName(),
fileInfo.name()
);
final Store.RecoveryDiff diff = sourceMetadata.recoveryDiff(recoveryTargetMetadata);
for (StoreFileMetadata md : diff.identical) {
BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name());
recoveryState.getIndex().addFileDetail(fileInfo.physicalName(), fileInfo.length(), true);
if (logger.isTraceEnabled()) {
logger.trace(
"[{}] [{}] not_recovering file [{}] from [{}], exists in local store and is same",
shardId,
snapshotId,
fileInfo.physicalName(),
fileInfo.name()
);
}
}
}

for (StoreFileMetadata md : concat(diff)) {
BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name());
filesToRecover.add(fileInfo);
recoveryState.getIndex().addFileDetail(fileInfo.physicalName(), fileInfo.length(), false);
if (logger.isTraceEnabled()) {
logger.trace("[{}] [{}] recovering [{}] from [{}]", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name());
for (StoreFileMetadata md : concat(diff)) {
BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name());
filesToRecover.add(fileInfo);
recoveryState.getIndex().addFileDetail(fileInfo.physicalName(), fileInfo.length(), false);
if (logger.isTraceEnabled()) {
logger.trace("[{}] [{}] recovering [{}] from [{}]", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name());
}
}
}

Expand Down Expand Up @@ -157,7 +163,7 @@ public void restore(SnapshotFiles snapshotFiles, Store store, ActionListener<Voi
restoreFiles(filesToRecover, store, ActionListener.wrap(v -> {
store.incRef();
try {
afterRestore(snapshotFiles, store, restoredSegmentsFile);
afterRestore(snapshotFiles, store);
listener.onResponse(null);
} finally {
store.decRef();
Expand All @@ -173,20 +179,8 @@ public void restore(SnapshotFiles snapshotFiles, Store store, ActionListener<Voi
}
}

private void afterRestore(SnapshotFiles snapshotFiles, Store store, StoreFileMetadata restoredSegmentsFile) {
try {
if (isSearchableSnapshotStore(store.indexSettings().getSettings()) == false) {
Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory());
Copy link
Contributor Author

@ywelsch ywelsch Nov 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code does not look to do anything, and is actually in the way as it opens the Lucene index before it's converted. No tests are breaking with this code, and it's unclear whether it still serves a purpose (even after some historic digging)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it looks like we clean up according to the snapshot contents anyway - this second cleanup was added later on in #8969 but there's no indication why it was needed. I think you're right anyway, no need to open the index here.

}
} catch (IOException e) {
throw new IndexShardRestoreFailedException(
shardId,
"Failed to remove files not referenced in segment file [" + restoredSegmentsFile.name() + "] after restore",
e
);
}

/// now, go over and clean files that are in the store, but were not in the snapshot
private void afterRestore(SnapshotFiles snapshotFiles, Store store) {
// clean files that are in the store, but were not in the snapshot
try {
for (String storeFile : store.directory().listAll()) {
if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) {
Expand All @@ -195,9 +189,10 @@ private void afterRestore(SnapshotFiles snapshotFiles, Store store, StoreFileMet
try {
store.directory().deleteFile(storeFile);
} catch (ImmutableDirectoryException e) {
// snapshots of immutable directories only contain an empty `segments_N` file since the data lives elsewhere, and if we
// restore such a snapshot then the real data is already present in the directory and cannot be removed.
assert snapshotFiles.indexFiles().size() == 1 : snapshotFiles;
// snapshots of immutable directories either only contain an empty `segments_N` file or no data at all,
// since the data lives elsewhere, and if we restore such a snapshot then the real data is already present
// in the directory and cannot be removed.
assert snapshotFiles.indexFiles().size() <= 1 : snapshotFiles;
} catch (IOException e) {
logger.warn("[{}] [{}] failed to delete file [{}] during snapshot cleanup", shardId, snapshotId, storeFile);
}
Expand Down
Loading