Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use primary term corresponding to generation while deleting #15639

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Use primary term corresponding to generation while deleting
Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
Sachin Kale committed Sep 4, 2024
commit 1c0c04f7f7ee7d636211db467c6b9f59c0a5d3a0
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog {
private final Map<Long, String> metadataFilePinnedTimestampMap;
// For metadata files, with no min generation in the name, we cache generation data to avoid multiple reads.
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFileGenerationMap;
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap;

public RemoteFsTimestampAwareTranslog(
TranslogConfig config,
Expand Down Expand Up @@ -86,6 +87,7 @@ public RemoteFsTimestampAwareTranslog(
logger = Loggers.getLogger(getClass(), shardId);
this.metadataFilePinnedTimestampMap = new HashMap<>();
this.oldFormatMetadataFileGenerationMap = new HashMap<>();
this.oldFormatMetadataFilePrimaryTermMap = new HashMap<>();
}

@Override
Expand Down Expand Up @@ -184,35 +186,41 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);

logger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);
Set<Long> generationsToBeDeleted = getGenerationsToBeDeleted(
Map<Long, Set<Long>> generationsToBeDeletedToPrimaryTermRangeMap = getGenerationsToBeDeleted(
metadataFilesNotToBeDeleted,
metadataFilesToBeDeleted,
indexDeleted
);

logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted);
if (generationsToBeDeleted.isEmpty() == false) {
logger.debug(() -> "generationsToBeDeletedToPrimaryTermRangeMap = " + generationsToBeDeletedToPrimaryTermRangeMap);
if (generationsToBeDeletedToPrimaryTermRangeMap.isEmpty() == false) {
// Delete stale generations
Map<Long, Set<Long>> primaryTermToGenerationsMap = getPrimaryTermToGenerationsMap(
generationsToBeDeletedToPrimaryTermRangeMap
);
translogTransferManager.deleteGenerationAsync(
primaryTermSupplier.getAsLong(),
generationsToBeDeleted,
primaryTermToGenerationsMap,
remoteGenerationDeletionPermits::release
);
} else {
remoteGenerationDeletionPermits.release();
}

if (metadataFilesToBeDeleted.isEmpty() == false) {
// Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(
metadataFilesToBeDeleted,
remoteGenerationDeletionPermits::release
);

// Update cache to keep only those metadata files that are not getting deleted
oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted);

// Delete stale primary terms
deleteStaleRemotePrimaryTerms(metadataFiles);
} else {
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
remoteGenerationDeletionPermits.release();
}

// Update cache to keep only those metadata files that are not getting deleted
oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted);

// Delete stale primary terms
deleteStaleRemotePrimaryTerms(metadataFiles);
} catch (Exception e) {
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
}
Expand All @@ -227,8 +235,21 @@ public void onFailure(Exception e) {
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
}

protected Map<Long, Set<Long>> getPrimaryTermToGenerationsMap(Map<Long, Set<Long>> generationsToBeDeletedToPrimaryTermRangeMap) {
Map<Long, Set<Long>> primaryTermToGenerationsMap = new HashMap<>();
for (Map.Entry<Long, Set<Long>> entry : generationsToBeDeletedToPrimaryTermRangeMap.entrySet()) {
for (Long primaryTerm : entry.getValue()) {
if (primaryTermToGenerationsMap.containsKey(primaryTerm) == false) {
primaryTermToGenerationsMap.put(primaryTerm, new HashSet<>());
}
primaryTermToGenerationsMap.get(primaryTerm).add(entry.getKey());
}
}
return primaryTermToGenerationsMap;
}

// Visible for testing
protected Set<Long> getGenerationsToBeDeleted(
protected Map<Long, Set<Long>> getGenerationsToBeDeleted(
List<String> metadataFilesNotToBeDeleted,
List<String> metadataFilesToBeDeleted,
boolean indexDeleted
Expand All @@ -239,24 +260,56 @@ protected Set<Long> getGenerationsToBeDeleted(
maxGenerationToBeDeleted = minRemoteGenReferenced - 1 - indexSettings().getRemoteTranslogExtraKeep();
}

Set<Long> generationsFromMetadataFilesToBeDeleted = new HashSet<>();
Map<Long, Set<String>> generationsFromMetadataFilesToBeDeleted = new HashMap<>();
for (String mdFile : metadataFilesToBeDeleted) {
Tuple<Long, Long> minMaxGen = getMinMaxTranslogGenerationFromMetadataFile(mdFile, translogTransferManager);
generationsFromMetadataFilesToBeDeleted.addAll(
LongStream.rangeClosed(minMaxGen.v1(), minMaxGen.v2()).boxed().collect(Collectors.toList())
);
List<Long> generations = LongStream.rangeClosed(minMaxGen.v1(), minMaxGen.v2()).boxed().collect(Collectors.toList());
for (Long generation : generations) {
if (generationsFromMetadataFilesToBeDeleted.containsKey(generation) == false) {
generationsFromMetadataFilesToBeDeleted.put(generation, new HashSet<>());
}
generationsFromMetadataFilesToBeDeleted.get(generation).add(mdFile);
}
}

for (String mdFile : metadataFilesNotToBeDeleted) {
Tuple<Long, Long> minMaxGen = getMinMaxTranslogGenerationFromMetadataFile(mdFile, translogTransferManager);
List<Long> generations = LongStream.rangeClosed(minMaxGen.v1(), minMaxGen.v2()).boxed().collect(Collectors.toList());
for (Long generation : generations) {
if (generationsFromMetadataFilesToBeDeleted.containsKey(generation)) {
generationsFromMetadataFilesToBeDeleted.get(generation).add(mdFile);
}
}
}

Map<String, Tuple<Long, Long>> metadataFileNotToBeDeletedGenerationMap = getGenerationForMetadataFiles(metadataFilesNotToBeDeleted);
TreeSet<Tuple<Long, Long>> pinnedGenerations = getOrderedPinnedMetadataGenerations(metadataFileNotToBeDeletedGenerationMap);
Set<Long> generationsToBeDeleted = new HashSet<>();
for (long generation : generationsFromMetadataFilesToBeDeleted) {
Map<Long, Set<Long>> generationsToBeDeletedToPrimaryTermRangeMap = new HashMap<>();
for (long generation : generationsFromMetadataFilesToBeDeleted.keySet()) {
// Check if the generation is not referred by metadata file matching pinned timestamps
if (generation <= maxGenerationToBeDeleted && isGenerationPinned(generation, pinnedGenerations) == false) {
generationsToBeDeleted.add(generation);
generationsToBeDeletedToPrimaryTermRangeMap.put(
generation,
getPrimaryTermRange(generationsFromMetadataFilesToBeDeleted.get(generation), translogTransferManager)
);
}
}
return generationsToBeDeletedToPrimaryTermRangeMap;
}

protected Set<Long> getPrimaryTermRange(Set<String> metadataFiles, TranslogTransferManager translogTransferManager) throws IOException {
Tuple<Long, Long> primaryTermRange = new Tuple<>(Long.MIN_VALUE, Long.MAX_VALUE);
for (String metadataFile : metadataFiles) {
Tuple<Long, Long> primaryTermRangeForMdFile = getMinMaxPrimaryTermFromMetadataFile(metadataFile, translogTransferManager);
primaryTermRange = new Tuple<>(
Math.max(primaryTermRange.v1(), primaryTermRangeForMdFile.v1()),
Math.min(primaryTermRange.v2(), primaryTermRangeForMdFile.v2())
);
if (primaryTermRange.v1().equals(primaryTermRange.v2())) {
break;
}
}
return generationsToBeDeleted;
return LongStream.rangeClosed(primaryTermRange.v1(), primaryTermRange.v2()).boxed().collect(Collectors.toSet());
}

// Visible for testing
Expand Down Expand Up @@ -351,6 +404,36 @@ protected Tuple<Long, Long> getMinMaxTranslogGenerationFromMetadataFile(
}
}

protected Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(String metadataFile, TranslogTransferManager translogTransferManager)
throws IOException {
Tuple<Long, Long> minMaxPrimaryTermFromFileName = TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(metadataFile);
if (minMaxPrimaryTermFromFileName != null) {
return minMaxPrimaryTermFromFileName;
} else {
if (oldFormatMetadataFilePrimaryTermMap.containsKey(metadataFile)) {
return oldFormatMetadataFilePrimaryTermMap.get(metadataFile);
} else {
TranslogTransferMetadata metadata = translogTransferManager.readMetadata(metadataFile);
long maxPrimaryTem = TranslogTransferMetadata.getPrimaryTermFromFileName(metadataFile);
long minPrimaryTem = -1;
if (metadata.getGenerationToPrimaryTermMapper() != null
&& metadata.getGenerationToPrimaryTermMapper().values().isEmpty() == false) {
Optional<Long> primaryTerm = metadata.getGenerationToPrimaryTermMapper()
.values()
.stream()
.map(s -> Long.parseLong(s))
.min(Long::compareTo);
if (primaryTerm.isPresent()) {
minPrimaryTem = primaryTerm.get();
}
}
Tuple<Long, Long> minMaxPrimaryTermTuple = new Tuple<>(minPrimaryTem, maxPrimaryTem);
oldFormatMetadataFilePrimaryTermMap.put(metadataFile, minMaxPrimaryTermTuple);
return minMaxPrimaryTermTuple;
}
}
}

/**
* This method must be called only after there are valid generations to delete in trimUnreferencedReaders as it ensures
* implicitly that minimum primary term in latest translog metadata in remote store is the current primary term.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.common.SetOnce;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
Expand All @@ -39,6 +40,7 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -496,6 +498,12 @@ public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOExcep
* @param onCompletion runnable to run on completion of deletion regardless of success/failure.
*/
public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runnable onCompletion) {
List<String> translogFiles = getTranslogFilesFromGenerations(generations);
// Delete the translog and checkpoint files asynchronously
deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion);
}

private List<String> getTranslogFilesFromGenerations(Set<Long> generations) {
List<String> translogFiles = new ArrayList<>();
generations.forEach(generation -> {
// Add .ckp and .tlog file to translog file list which is located in basePath/<primaryTerm>
Expand All @@ -507,8 +515,32 @@ public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runna
translogFiles.add(translogFileName);
}
});
return translogFiles;
}

public void deleteGenerationAsync(Map<Long, Set<Long>> primaryTermToGenerationsMap, Runnable onCompletion) {
GroupedActionListener<Void> groupedActionListener = new GroupedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Collection<Void> unused) {
logger.trace(() -> "Deleted translogs for primaryTermToGenerationsMap=" + primaryTermToGenerationsMap);
onCompletion.run();
}

@Override
public void onFailure(Exception e) {
onCompletion.run();
logger.error(
() -> new ParameterizedMessage(
"Exception occurred while deleting translog for primaryTermToGenerationsMap={}",
primaryTermToGenerationsMap
),
e
);
}
}, primaryTermToGenerationsMap.size());

// Delete the translog and checkpoint files asynchronously
deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion);
deleteTranslogFilesAsync(primaryTermToGenerationsMap, groupedActionListener);
}

/**
Expand Down Expand Up @@ -683,6 +715,21 @@ public void onFailure(Exception e) {
}
}

private void deleteTranslogFilesAsync(Map<Long, Set<Long>> primaryTermToGenerationsMap, ActionListener<Void> actionListener) {
for (Long primaryTerm : primaryTermToGenerationsMap.keySet()) {
try {
transferService.deleteBlobsAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteDataTransferPath.add(String.valueOf(primaryTerm)),
getTranslogFilesFromGenerations(primaryTermToGenerationsMap.get(primaryTerm)),
actionListener
);
} catch (Exception e) {
actionListener.onFailure(e);
}
}
}

/**
* Deletes metadata files asynchronously using the {@code REMOTE_PURGE} threadpool. On success or failure, runs {@code onCompletion}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
* The metadata associated with every transfer {@link TransferSnapshot}. The metadata is uploaded at the end of the
Expand Down Expand Up @@ -108,11 +109,28 @@ public String getFileName() {
RemoteStoreUtils.invertLong(createdAt),
String.valueOf(Objects.hash(nodeId)),
RemoteStoreUtils.invertLong(minTranslogGeneration),
String.valueOf(getMinPrimaryTermReferred()),
String.valueOf(CURRENT_VERSION)
)
);
}

private long getMinPrimaryTermReferred() {
if (generationToPrimaryTermMapper.get() == null || generationToPrimaryTermMapper.get().values().isEmpty()) {
return -1;
}
Optional<Long> minPrimaryTerm = generationToPrimaryTermMapper.get()
.values()
.stream()
.map(s -> Long.parseLong(s))
.min(Long::compareTo);
if (minPrimaryTerm.isPresent()) {
return minPrimaryTerm.get();
} else {
return -1;
}
}

public static Tuple<Tuple<Long, Long>, String> getNodeIdByPrimaryTermAndGeneration(String filename) {
String[] tokens = filename.split(METADATA_SEPARATOR);
if (tokens.length < 6) {
Expand Down Expand Up @@ -143,15 +161,43 @@ public static Tuple<Long, Long> getMinMaxTranslogGenerationFromFilename(String f
assert Version.CURRENT.onOrAfter(Version.V_2_17_0);
try {
// instead of direct index, we go backwards to avoid running into same separator in nodeId
String minGeneration = tokens[tokens.length - 2];
String minGeneration = tokens[tokens.length - 3];
String maxGeneration = tokens[2];
return new Tuple<>(RemoteStoreUtils.invertLong(minGeneration), RemoteStoreUtils.invertLong(maxGeneration));
} catch (NumberFormatException e) {
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Exception while getting min and max translog generation from: {}", filename), e);
return null;
}
}

public static Tuple<Long, Long> getMinMaxPrimaryTermFromFilename(String filename) {
String[] tokens = filename.split(METADATA_SEPARATOR);
if (tokens.length < 7) {
// For versions < 2.17, we don't have min primary term.
return null;
}
assert Version.CURRENT.onOrAfter(Version.V_2_17_0);
try {
// instead of direct index, we go backwards to avoid running into same separator in nodeId
String minPrimaryTerm = tokens[tokens.length - 2];
String maxPrimaryTerm = tokens[1];
return new Tuple<>(Long.parseLong(minPrimaryTerm), RemoteStoreUtils.invertLong(maxPrimaryTerm));
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Exception while getting min and max primary term from: {}", filename), e);
return null;
}
}

public static long getPrimaryTermFromFileName(String filename) {
String[] tokens = filename.split(METADATA_SEPARATOR);
try {
return RemoteStoreUtils.invertLong(tokens[1]);
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Exception while getting max primary term from: {}", filename), e);
return -1;
}
}

@Override
public int hashCode() {
return Objects.hash(primaryTerm, generation);
Expand Down
Loading
Loading