Skip to content

Commit 4be4ea0

Browse files
committed
Add Changes in Snapshot Delete Flow for remote store interoperability.
Signed-off-by: Bansi Kasundra <kasundra@amazon.com>
1 parent 1cf9c5c commit 4be4ea0

File tree

10 files changed

+119
-24
lines changed

10 files changed

+119
-24
lines changed

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.opensearch.common.unit.ByteSizeValue;
5252
import org.opensearch.core.common.Strings;
5353
import org.opensearch.core.xcontent.NamedXContentRegistry;
54+
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
5455
import org.opensearch.indices.recovery.RecoverySettings;
5556
import org.opensearch.monitor.jvm.JvmInfo;
5657
import org.opensearch.repositories.RepositoryData;
@@ -307,9 +308,10 @@ public void deleteSnapshots(
307308
Collection<SnapshotId> snapshotIds,
308309
long repositoryStateId,
309310
Version repositoryMetaVersion,
311+
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
310312
ActionListener<RepositoryData> listener
311313
) {
312-
super.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener);
314+
super.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, remoteStoreLockManagerFactory, listener);
313315
}
314316

315317
@Override

server/src/main/java/org/opensearch/repositories/FilterRepository.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.opensearch.index.shard.ShardId;
4747
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
4848
import org.opensearch.index.store.Store;
49+
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
4950
import org.opensearch.indices.recovery.RecoveryState;
5051
import org.opensearch.snapshots.SnapshotId;
5152
import org.opensearch.snapshots.SnapshotInfo;
@@ -120,9 +121,10 @@ public void deleteSnapshots(
120121
Collection<SnapshotId> snapshotIds,
121122
long repositoryStateId,
122123
Version repositoryMetaVersion,
124+
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
123125
ActionListener<RepositoryData> listener
124126
) {
125-
in.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener);
127+
in.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, remoteStoreLockManagerFactory, listener);
126128
}
127129

128130
@Override

server/src/main/java/org/opensearch/repositories/Repository.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.opensearch.index.shard.ShardId;
4848
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
4949
import org.opensearch.index.store.Store;
50+
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
5051
import org.opensearch.indices.recovery.RecoveryState;
5152
import org.opensearch.snapshots.SnapshotId;
5253
import org.opensearch.snapshots.SnapshotInfo;
@@ -164,6 +165,7 @@ void deleteSnapshots(
164165
Collection<SnapshotId> snapshotIds,
165166
long repositoryStateId,
166167
Version repositoryMetaVersion,
168+
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
167169
ActionListener<RepositoryData> listener
168170
);
169171

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 78 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@
114114
import org.opensearch.index.snapshots.blobstore.SnapshotFiles;
115115
import org.opensearch.index.store.Store;
116116
import org.opensearch.index.store.StoreFileMetadata;
117+
import org.opensearch.index.store.lockmanager.FileLockInfo;
118+
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
119+
import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
117120
import org.opensearch.indices.recovery.RecoverySettings;
118121
import org.opensearch.indices.recovery.RecoveryState;
119122
import org.opensearch.repositories.IndexId;
@@ -745,6 +748,7 @@ public void deleteSnapshots(
745748
Collection<SnapshotId> snapshotIds,
746749
long repositoryStateId,
747750
Version repositoryMetaVersion,
751+
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
748752
ActionListener<RepositoryData> listener
749753
) {
750754
if (isReadOnly()) {
@@ -765,6 +769,7 @@ protected void doRun() throws Exception {
765769
rootBlobs,
766770
repositoryData,
767771
repositoryMetaVersion,
772+
remoteStoreLockManagerFactory,
768773
listener
769774
);
770775
}
@@ -844,6 +849,7 @@ private void doDeleteShardSnapshots(
844849
Map<String, BlobMetadata> rootBlobs,
845850
RepositoryData repositoryData,
846851
Version repoMetaVersion,
852+
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
847853
ActionListener<RepositoryData> listener
848854
) {
849855
// First write the new shard state metadata (with the removed snapshot) and compute deletion targets
@@ -878,6 +884,7 @@ private void doDeleteShardSnapshots(
878884
ActionListener.wrap(() -> listener.onResponse(updatedRepoData)),
879885
2
880886
);
887+
releaseResourceLockFiles(snapshotIds, repositoryData, remoteStoreLockManagerFactory, afterCleanupsListener);
881888
cleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
882889
asyncCleanupUnlinkedShardLevelBlobs(
883890
repositoryData,
@@ -1039,17 +1046,27 @@ protected void doRun() throws Exception {
10391046
final Set<String> blobs = shardContainer.listBlobs().keySet();
10401047
final BlobStoreIndexShardSnapshots blobStoreIndexShardSnapshots;
10411048
final long newGen;
1042-
if (useUUIDs) {
1043-
newGen = -1L;
1044-
blobStoreIndexShardSnapshots = buildBlobStoreIndexShardSnapshots(
1045-
blobs,
1046-
shardContainer,
1047-
oldRepositoryData.shardGenerations().getShardGen(indexId, finalShardId)
1048-
).v1();
1049+
1050+
// Index- file would be present if snapshots other than shallow snapshots are present for this shard
1051+
if (shardContainer.listBlobsByPrefix(SNAPSHOT_INDEX_PREFIX).size() > 0) {
1052+
if (useUUIDs) {
1053+
newGen = -1L;
1054+
blobStoreIndexShardSnapshots = buildBlobStoreIndexShardSnapshots(
1055+
blobs,
1056+
shardContainer,
1057+
oldRepositoryData.shardGenerations().getShardGen(indexId, finalShardId)
1058+
).v1();
1059+
} else {
1060+
Tuple<BlobStoreIndexShardSnapshots, Long> tuple = buildBlobStoreIndexShardSnapshots(
1061+
blobs,
1062+
shardContainer
1063+
);
1064+
newGen = tuple.v2() + 1;
1065+
blobStoreIndexShardSnapshots = tuple.v1();
1066+
}
10491067
} else {
1050-
Tuple<BlobStoreIndexShardSnapshots, Long> tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
1051-
newGen = tuple.v2() + 1;
1052-
blobStoreIndexShardSnapshots = tuple.v1();
1068+
newGen = -1L;
1069+
blobStoreIndexShardSnapshots = BlobStoreIndexShardSnapshots.EMPTY;
10531070
}
10541071
allShardsListener.onResponse(
10551072
deleteFromShardSnapshotMeta(
@@ -1086,6 +1103,42 @@ public void onFailure(Exception ex) {
10861103
}
10871104
}
10881105

1106+
private void releaseResourceLockFiles(
1107+
Collection<SnapshotId> snapshotIds,
1108+
RepositoryData repositoryData,
1109+
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
1110+
ActionListener<Void> onAllShardsCompleted
1111+
) {
1112+
for (SnapshotId snapshotId : snapshotIds) {
1113+
List<String> indices = this.getSnapshotInfo(snapshotId).indices();
1114+
List<IndexId> indexIds = repositoryData.resolveIndices(indices);
1115+
for (IndexId indexId : indexIds) {
1116+
try {
1117+
IndexMetadata indexMetadata = this.getSnapshotIndexMetaData(repositoryData, snapshotId, indexId);
1118+
if (this.getSnapshotInfo(snapshotId).isRemoteStoreIndexShallowCopyEnabled()
1119+
&& indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false)) {
1120+
int numberOfShards = indexMetadata.getNumberOfShards();
1121+
for (int shardId = 0; shardId < numberOfShards; shardId++) {
1122+
final int finalShardId = shardId;
1123+
String indexUUID = indexMetadata.getIndexUUID();
1124+
String remoteStoreRepoForIndex = indexMetadata.getSettings().get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY);
1125+
RemoteStoreMetadataLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager(
1126+
remoteStoreRepoForIndex,
1127+
indexUUID,
1128+
String.valueOf(finalShardId)
1129+
);
1130+
remoteStoreMetadataLockManager.release(
1131+
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotId.getUUID()).build()
1132+
);
1133+
}
1134+
}
1135+
} catch (IOException e) {
1136+
onAllShardsCompleted.onFailure(e);
1137+
}
1138+
}
1139+
}
1140+
}
1141+
10891142
private List<String> resolveFilesToDelete(
10901143
RepositoryData oldRepositoryData,
10911144
Collection<SnapshotId> snapshotIds,
@@ -2889,16 +2942,24 @@ private ShardSnapshotMetaDeleteResult deleteFromShardSnapshotMeta(
28892942
}
28902943
String writtenGeneration = null;
28912944
try {
2892-
if (newSnapshotsList.isEmpty()) {
2945+
// Using survivingSnapshots instead of newSnapshotsList as shallow snapshots can be present which won't be part of
2946+
// newSnapshotsList
2947+
if (survivingSnapshots.isEmpty()) {
28932948
return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId, ShardGenerations.DELETED_SHARD_GEN, blobs);
28942949
} else {
2895-
final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
2896-
if (indexGeneration < 0L) {
2897-
writtenGeneration = UUIDs.randomBase64UUID();
2898-
INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration, compressor);
2950+
final BlobStoreIndexShardSnapshots updatedSnapshots;
2951+
// If we have surviving non shallow snapshots, update index- file.
2952+
if (newSnapshotsList.size() > 0) {
2953+
updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
2954+
if (indexGeneration < 0L) {
2955+
writtenGeneration = UUIDs.randomBase64UUID();
2956+
INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration, compressor);
2957+
} else {
2958+
writtenGeneration = String.valueOf(indexGeneration);
2959+
writeShardIndexBlobAtomic(shardContainer, indexGeneration, updatedSnapshots);
2960+
}
28992961
} else {
2900-
writtenGeneration = String.valueOf(indexGeneration);
2901-
writeShardIndexBlobAtomic(shardContainer, indexGeneration, updatedSnapshots);
2962+
updatedSnapshots = BlobStoreIndexShardSnapshots.EMPTY;
29022963
}
29032964
final Set<String> survivingSnapshotUUIDs = survivingSnapshots.stream().map(SnapshotId::getUUID).collect(Collectors.toSet());
29042965
return new ShardSnapshotMetaDeleteResult(

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import org.opensearch.common.unit.TimeValue;
9191
import org.opensearch.index.Index;
9292
import org.opensearch.index.shard.ShardId;
93+
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
9394
import org.opensearch.repositories.IndexId;
9495
import org.opensearch.repositories.RepositoriesService;
9596
import org.opensearch.repositories.Repository;
@@ -151,6 +152,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
151152

152153
private final RepositoriesService repositoriesService;
153154

155+
private final RemoteStoreLockManagerFactory remoteStoreLockManagerFactory;
156+
154157
private final ThreadPool threadPool;
155158

156159
private final Map<Snapshot, List<ActionListener<Tuple<RepositoryData, SnapshotInfo>>>> snapshotCompletionListeners =
@@ -206,6 +209,7 @@ public SnapshotsService(
206209
this.clusterService = clusterService;
207210
this.indexNameExpressionResolver = indexNameExpressionResolver;
208211
this.repositoriesService = repositoriesService;
212+
this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService);
209213
this.threadPool = transportService.getThreadPool();
210214
this.transportService = transportService;
211215

@@ -1538,7 +1542,12 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
15381542
if (deletionToRun == null) {
15391543
runNextQueuedOperation(repositoryData, repository, false);
15401544
} else {
1541-
deleteSnapshotsFromRepository(deletionToRun, repositoryData, newState.nodes().getMinNodeVersion());
1545+
deleteSnapshotsFromRepository(
1546+
deletionToRun,
1547+
repositoryData,
1548+
newState.nodes().getMinNodeVersion(),
1549+
remoteStoreLockManagerFactory
1550+
);
15421551
}
15431552
}
15441553
});
@@ -2063,7 +2072,12 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
20632072
}
20642073
if (newDelete.state() == SnapshotDeletionsInProgress.State.STARTED) {
20652074
if (tryEnterRepoLoop(repoName)) {
2066-
deleteSnapshotsFromRepository(newDelete, repositoryData, newState.nodes().getMinNodeVersion());
2075+
deleteSnapshotsFromRepository(
2076+
newDelete,
2077+
repositoryData,
2078+
newState.nodes().getMinNodeVersion(),
2079+
remoteStoreLockManagerFactory
2080+
);
20672081
} else {
20682082
logger.trace("Delete [{}] could not execute directly and was queued", newDelete);
20692083
}
@@ -2140,7 +2154,7 @@ public void onResponse(RepositoryData repositoryData) {
21402154
+ "] in cluster state and ["
21412155
+ repositoryData.getGenId()
21422156
+ "] in the repository";
2143-
deleteSnapshotsFromRepository(deleteEntry, repositoryData, minNodeVersion);
2157+
deleteSnapshotsFromRepository(deleteEntry, repositoryData, minNodeVersion, remoteStoreLockManagerFactory);
21442158
}
21452159

21462160
@Override
@@ -2162,7 +2176,8 @@ public void onFailure(Exception e) {
21622176
private void deleteSnapshotsFromRepository(
21632177
SnapshotDeletionsInProgress.Entry deleteEntry,
21642178
RepositoryData repositoryData,
2165-
Version minNodeVersion
2179+
Version minNodeVersion,
2180+
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory
21662181
) {
21672182
if (repositoryOperations.startDeletion(deleteEntry.uuid())) {
21682183
assert currentlyFinalizing.contains(deleteEntry.repository());
@@ -2173,6 +2188,7 @@ private void deleteSnapshotsFromRepository(
21732188
snapshotIds,
21742189
repositoryData.getGenId(),
21752190
minCompatibleVersion(minNodeVersion, repositoryData, snapshotIds),
2191+
remoteStoreLockManagerFactory,
21762192
ActionListener.wrap(updatedRepoData -> {
21772193
logger.info("snapshots {} deleted", snapshotIds);
21782194
removeSnapshotDeletionFromClusterState(deleteEntry, null, updatedRepoData);
@@ -2327,7 +2343,12 @@ public final void clusterStateProcessed(String source, ClusterState oldState, Cl
23272343
leaveRepoLoop(deleteEntry.repository());
23282344
} else {
23292345
for (SnapshotDeletionsInProgress.Entry readyDeletion : readyDeletions) {
2330-
deleteSnapshotsFromRepository(readyDeletion, repositoryData, newState.nodes().getMinNodeVersion());
2346+
deleteSnapshotsFromRepository(
2347+
readyDeletion,
2348+
repositoryData,
2349+
newState.nodes().getMinNodeVersion(),
2350+
remoteStoreLockManagerFactory
2351+
);
23312352
}
23322353
}
23332354
} else {

server/src/test/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ private SnapshotsInProgress.Entry createEntry(String dataStreamName, String repo
162162
return new SnapshotsInProgress.Entry(
163163
new Snapshot(repo, new SnapshotId("", "")),
164164
false,
165+
false,
165166
partial,
166167
SnapshotsInProgress.State.SUCCESS,
167168
Collections.emptyList(),

server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ public void testDeleteSnapshotting() {
103103
snapshot,
104104
true,
105105
false,
106+
false,
106107
SnapshotsInProgress.State.INIT,
107108
singletonList(new IndexId(index, "doesn't matter")),
108109
Collections.emptyList(),

server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexStateServiceTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,7 @@ private static ClusterState addSnapshotIndex(final String index, final int numSh
431431
snapshot,
432432
randomBoolean(),
433433
false,
434+
false,
434435
SnapshotsInProgress.State.INIT,
435436
Collections.singletonList(new IndexId(index, index)),
436437
Collections.emptyList(),

server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.opensearch.index.shard.ShardId;
6060
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
6161
import org.opensearch.index.store.Store;
62+
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
6263
import org.opensearch.indices.recovery.RecoverySettings;
6364
import org.opensearch.indices.recovery.RecoveryState;
6465
import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository;
@@ -264,6 +265,7 @@ public void deleteSnapshots(
264265
Collection<SnapshotId> snapshotIds,
265266
long repositoryStateId,
266267
Version repositoryMetaVersion,
268+
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
267269
ActionListener<RepositoryData> listener
268270
) {
269271
listener.onResponse(null);

test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.opensearch.index.mapper.MapperService;
4545
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
4646
import org.opensearch.index.store.Store;
47+
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
4748
import org.opensearch.repositories.IndexId;
4849
import org.opensearch.repositories.IndexMetaDataGenerations;
4950
import org.opensearch.repositories.Repository;
@@ -133,6 +134,7 @@ public void deleteSnapshots(
133134
Collection<SnapshotId> snapshotIds,
134135
long repositoryStateId,
135136
Version repositoryMetaVersion,
137+
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
136138
ActionListener<RepositoryData> listener
137139
) {
138140
listener.onResponse(null);

0 commit comments

Comments
 (0)