Skip to content

Commit

Permalink
Moved cleanupOldShardGens to snapshot_deletion threadpool
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Aug 9, 2024
1 parent dd6568e commit 0e406ea
Showing 1 changed file with 57 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1914,9 +1914,10 @@ public void finalizeSnapshot(
stateTransformer,
ActionListener.wrap(newRepoData -> {
if (writeShardGens) {
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData, newRepoData, listener);
} else {
listener.onResponse(newRepoData);
}
listener.onResponse(newRepoData);
}, onUpdateFailure)
);
}, onUpdateFailure), 2 + indices.size());
Expand Down Expand Up @@ -1969,7 +1970,12 @@ public void finalizeSnapshot(
}

// Delete all old shard gen blobs that aren't referenced any longer as a result from moving to updated repository data
private void cleanupOldShardGens(RepositoryData existingRepositoryData, RepositoryData updatedRepositoryData) {
private void cleanupOldShardGens(
RepositoryData existingRepositoryData,
RepositoryData updatedRepositoryData,
RepositoryData newRepositoryData,
ActionListener<RepositoryData> listener
) {
final List<String> toDelete = new ArrayList<>();
updatedRepositoryData.shardGenerations()
.obsoleteShardGenerations(existingRepositoryData.shardGenerations())
Expand All @@ -1978,11 +1984,57 @@ private void cleanupOldShardGens(RepositoryData existingRepositoryData, Reposito
(shardId, oldGen) -> toDelete.add(shardContainer(indexId, shardId).path().buildAsString() + INDEX_FILE_PREFIX + oldGen)
)
);
if (toDelete.isEmpty()) {
listener.onResponse(newRepositoryData);
return;
}
try {
logger.info("{} shards generations to be deleted as part of cleanupOldShardGens", toDelete);
deleteFromContainer(rootBlobContainer(), toDelete);
AtomicInteger counter = new AtomicInteger();
Collection<List<String>> subList = toDelete.stream()
.collect(Collectors.groupingBy(it -> counter.getAndIncrement() / maxShardBlobDeleteBatch))
.values();
final BlockingQueue<List<String>> staleFilesToDeleteInBatch = new LinkedBlockingQueue<>(subList);
logger.info("cleanupOldShardGens toDeleteSize={} groupSize={}", toDelete.size(), staleFilesToDeleteInBatch.size());
final GroupedActionListener<Void> groupedListener = new GroupedActionListener<>(ActionListener.wrap(r -> {
logger.info("completed cleanupOldShardGens");
listener.onResponse(newRepositoryData);
}, ex -> {
logger.error("exception in cleanupOldShardGens", ex);
listener.onResponse(newRepositoryData);
}), staleFilesToDeleteInBatch.size());

// Start as many workers as fit into the snapshot pool at once at the most
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT_DELETION).getMax(), staleFilesToDeleteInBatch.size());
for (int i = 0; i < workers; ++i) {
cleanupOldFiles(staleFilesToDeleteInBatch, groupedListener);
}
} catch (Exception e) {
logger.warn("Failed to clean up old shard generation blobs", e);
listener.onResponse(newRepositoryData);
}
}

private void cleanupOldFiles(BlockingQueue<List<String>> staleFilesToDeleteInBatch, GroupedActionListener<Void> listener)
throws InterruptedException {
List<String> filesToDelete = staleFilesToDeleteInBatch.poll(0L, TimeUnit.MILLISECONDS);
if (filesToDelete != null) {
threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION).execute(ActionRunnable.wrap(listener, l -> {
try {
deleteFromContainer(rootBlobContainer(), filesToDelete);
l.onResponse(null);
} catch (Exception e) {
logger.warn(
() -> new ParameterizedMessage(
"[{}] Failed to delete following blobs during cleanupOldFiles : {}",
metadata.name(),
filesToDelete
),
e
);
l.onFailure(e);
}
cleanupOldFiles(staleFilesToDeleteInBatch, listener);
}));
}
}

Expand Down

0 comments on commit 0e406ea

Please sign in to comment.