Skip to content

Reduce Heap Use during Shard Snapshot #60370

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1872,63 +1872,78 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
snapshotStatus.moveToStarted(startTime, indexIncrementalFileCount,
indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileSize);

final StepListener<Collection<Void>> allFilesUploadedListener = new StepListener<>();
allFilesUploadedListener.whenComplete(v -> {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus =
snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration());

// now create and write the commit point
final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(),
lastSnapshotStatus.getIndexVersion(),
indexCommitPointFiles,
lastSnapshotStatus.getStartTime(),
threadPool.absoluteTimeInMillis() - lastSnapshotStatus.getStartTime(),
lastSnapshotStatus.getIncrementalFileCount(),
lastSnapshotStatus.getIncrementalSize()
);

logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
final String indexGeneration;
final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion);
// build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
newSnapshotsList.add(new SnapshotFiles(snapshotId.getName(), indexCommitPointFiles, shardStateIdentifier));
for (SnapshotFiles point : snapshots) {
newSnapshotsList.add(point);
}
final BlobStoreIndexShardSnapshots updatedBlobStoreIndexShardSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
final Runnable afterWriteSnapBlob;
if (writeShardGens) {
// When using shard generations we can safely write the index-${uuid} blob before writing out any of the actual data
// for this shard since the uuid named blob will simply not be referenced in case of error and thus we will never
// reference a generation that has not had all its files fully upload.
indexGeneration = UUIDs.randomBase64UUID();
try {
INDEX_SHARD_SNAPSHOT_FORMAT.write(snapshot, shardContainer, snapshotId.getUUID(), compress);
writeShardIndexBlob(shardContainer, indexGeneration, updatedBlobStoreIndexShardSnapshots);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
}
// build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles(), shardStateIdentifier));
for (SnapshotFiles point : snapshots) {
newSnapshotsList.add(point);
throw new IndexShardSnapshotFailedException(shardId,
"Failed to write shard level snapshot metadata for [" + snapshotId + "] to ["
+ INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(indexGeneration) + "]", e);
}
final List<String> blobsToDelete;
final String indexGeneration;
final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion);
if (writeShardGens) {
indexGeneration = UUIDs.randomBase64UUID();
blobsToDelete = Collections.emptyList();
} else {
indexGeneration = Long.toString(Long.parseLong(fileListGeneration) + 1);
// Delete all previous index-N blobs
blobsToDelete = blobs.stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList());
assert blobsToDelete.stream().mapToLong(b -> Long.parseLong(b.replaceFirst(SNAPSHOT_INDEX_PREFIX, "")))
afterWriteSnapBlob = () -> {};
} else {
// When not using shard generations we can only write the index-${N} blob after all other work for this shard has
// completed.
// Also, in case of numeric shard generations the data node has to take care of deleting old shard generations.
indexGeneration = Long.toString(Long.parseLong(fileListGeneration) + 1);
// Delete all previous index-N blobs
final List<String> blobsToDelete = blobs.stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX))
.collect(Collectors.toList());
assert blobsToDelete.stream().mapToLong(b -> Long.parseLong(b.replaceFirst(SNAPSHOT_INDEX_PREFIX, "")))
.max().orElse(-1L) < Long.parseLong(indexGeneration)
: "Tried to delete an index-N blob newer than the current generation [" + indexGeneration
+ "] when deleting index-N blobs " + blobsToDelete;
}
try {
writeShardIndexBlob(shardContainer, indexGeneration, new BlobStoreIndexShardSnapshots(newSnapshotsList));
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId,
"Failed to finalize snapshot creation [" + snapshotId + "] with shard index ["
+ INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(indexGeneration) + "]", e);
}
if (writeShardGens == false) {
afterWriteSnapBlob = () -> {
try {
writeShardIndexBlob(shardContainer, indexGeneration, updatedBlobStoreIndexShardSnapshots);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId,
"Failed to finalize snapshot creation [" + snapshotId + "] with shard index ["
+ INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(indexGeneration) + "]", e);
}
try {
deleteFromContainer(shardContainer, blobsToDelete);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete old index-N blobs during finalization",
snapshotId, shardId), e);
snapshotId, shardId), e);
}
};
}

final StepListener<Collection<Void>> allFilesUploadedListener = new StepListener<>();
allFilesUploadedListener.whenComplete(v -> {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus =
snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration());

// now create and write the commit point
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
try {
INDEX_SHARD_SNAPSHOT_FORMAT.write(new BlobStoreIndexShardSnapshot(snapshotId.getName(),
lastSnapshotStatus.getIndexVersion(),
indexCommitPointFiles,
lastSnapshotStatus.getStartTime(),
threadPool.absoluteTimeInMillis() - lastSnapshotStatus.getStartTime(),
lastSnapshotStatus.getIncrementalFileCount(),
lastSnapshotStatus.getIncrementalSize()
), shardContainer, snapshotId.getUUID(), compress);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
}
afterWriteSnapBlob.run();
snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), indexGeneration);
listener.onResponse(indexGeneration);
}, listener::onFailure);
Expand Down