Skip to content

Commit

Permalink
Add snapshot_path prefix to snapshot shards path file on S3 (opensear…
Browse files Browse the repository at this point in the history
…ch-project#16267)

Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 authored and dk2k committed Oct 17, 2024
1 parent 57fb786 commit 4e24581
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@
import static org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1;
import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS;
import static org.opensearch.snapshots.SnapshotShardPaths.getIndexId;

/**
* BlobStore - based implementation of Snapshot Repository
Expand Down Expand Up @@ -2293,7 +2294,10 @@ private void remoteTranslogCleanupAsync(
* @return List of matching shard paths
*/
private List<String> findMatchingShardPaths(String indexId, Map<String, BlobMetadata> snapshotShardPaths) {
return snapshotShardPaths.keySet().stream().filter(s -> s.startsWith(indexId)).collect(Collectors.toList());
return snapshotShardPaths.keySet()
.stream()
.filter(s -> (s.startsWith(indexId) || s.startsWith(SnapshotShardPaths.FILE_PREFIX + indexId)))
.collect(Collectors.toList());
}

/**
Expand Down Expand Up @@ -2546,11 +2550,11 @@ public void finalizeSnapshot(
*/
private void cleanupRedundantSnapshotShardPaths(Set<String> updatedShardPathsIndexIds) {
Set<String> updatedIndexIds = updatedShardPathsIndexIds.stream()
.map(s -> s.split("\\" + SnapshotShardPaths.DELIMITER)[0])
.map(s -> getIndexId(s.split("\\" + SnapshotShardPaths.DELIMITER)[0]))
.collect(Collectors.toSet());
Set<String> indexIdShardPaths = getSnapshotShardPaths().keySet();
List<String> staleShardPaths = indexIdShardPaths.stream().filter(s -> updatedShardPathsIndexIds.contains(s) == false).filter(s -> {
String indexId = s.split("\\" + SnapshotShardPaths.DELIMITER)[0];
String indexId = getIndexId(s.split("\\" + SnapshotShardPaths.DELIMITER)[0]);
return updatedIndexIds.contains(indexId);
}).collect(Collectors.toList());
try {
Expand Down Expand Up @@ -2595,7 +2599,7 @@ String writeIndexShardPaths(IndexId indexId, SnapshotId snapshotId, int shardCou
List<String> paths = getShardPaths(indexId, shardCount);
int pathType = indexId.getShardPathType();
int pathHashAlgorithm = FNV_1A_COMPOSITE_1.getCode();
String blobName = String.join(
String name = String.join(
SnapshotShardPaths.DELIMITER,
indexId.getId(),
indexId.getName(),
Expand All @@ -2611,9 +2615,9 @@ String writeIndexShardPaths(IndexId indexId, SnapshotId snapshotId, int shardCou
PathType.fromCode(pathType),
PathHashAlgorithm.fromCode(pathHashAlgorithm)
);
SNAPSHOT_SHARD_PATHS_FORMAT.write(shardPaths, snapshotShardPathBlobContainer(), blobName);
SNAPSHOT_SHARD_PATHS_FORMAT.write(shardPaths, snapshotShardPathBlobContainer(), name);
logShardPathsOperationSuccess(indexId, snapshotId);
return blobName;
return SnapshotShardPaths.FILE_PREFIX + name;
} catch (IOException e) {
logShardPathsOperationWarning(indexId, snapshotId, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public class SnapshotShardPaths implements ToXContent {

public static final String DELIMITER = ".";

public static final String FILE_NAME_FORMAT = "%s";
public static final String FILE_PREFIX = "snapshot_path_";
public static final String FILE_NAME_FORMAT = FILE_PREFIX + "%s";

private static final String PATHS_FIELD = "paths";
private static final String INDEX_ID_FIELD = "indexId";
Expand Down Expand Up @@ -101,14 +102,21 @@ public static ShardInfo parseShardPath(String shardPath) {
throw new IllegalArgumentException("Invalid shard path format: " + shardPath);
}
try {
IndexId indexId = new IndexId(parts[1], parts[0], Integer.parseInt(parts[3]));
IndexId indexId = new IndexId(parts[1], getIndexId(parts[0]), Integer.parseInt(parts[3]));
int shardCount = Integer.parseInt(parts[2]);
return new ShardInfo(indexId, shardCount);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid shard path format: " + shardPath, e);
}
}

public static String getIndexId(String indexIdField) {
if (indexIdField.startsWith(FILE_PREFIX)) {
return indexIdField.substring(FILE_PREFIX.length());
}
return indexIdField;
}

/**
* Represents parsed information from a shard path.
* This class encapsulates the index ID and shard count extracted from a shard path string.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ public void testParseShardPath() {
IndexId indexId = repoData.getIndices().values().iterator().next();
int shardCount = repoData.shardGenerations().getGens(indexId).size();

// Version 2.17 has file name starting with indexId
String shardPath = String.join(
SnapshotShardPaths.DELIMITER,
indexId.getId(),
Expand All @@ -391,7 +392,19 @@ public void testParseShardPath() {
"1"
);
ShardInfo shardInfo = SnapshotShardPaths.parseShardPath(shardPath);
assertEquals(shardInfo.getIndexId(), indexId);
assertEquals(shardInfo.getShardCount(), shardCount);

// Version 2.17 has file name starting with snapshot_path_
shardPath = String.join(
SnapshotShardPaths.DELIMITER,
SnapshotShardPaths.FILE_PREFIX + indexId.getId(),
indexId.getName(),
String.valueOf(shardCount),
String.valueOf(indexId.getShardPathType()),
"1"
);
shardInfo = SnapshotShardPaths.parseShardPath(shardPath);
assertEquals(shardInfo.getIndexId(), indexId);
assertEquals(shardInfo.getShardCount(), shardCount);
}
Expand Down

0 comments on commit 4e24581

Please sign in to comment.