Skip to content

Commit 48a79f3

Browse files
committed
Add Changes in Snapshot Delete Flow for remote store interoperability.
Signed-off-by: Bansi Kasundra <kasundra@amazon.com>
1 parent 4956db3 commit 48a79f3

File tree

10 files changed

+120
-24
lines changed

10 files changed

+120
-24
lines changed

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.opensearch.common.unit.ByteSizeUnit;
5252
import org.opensearch.common.unit.ByteSizeValue;
5353
import org.opensearch.core.xcontent.NamedXContentRegistry;
54+
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
5455
import org.opensearch.indices.recovery.RecoverySettings;
5556
import org.opensearch.monitor.jvm.JvmInfo;
5657
import org.opensearch.repositories.RepositoryData;
@@ -313,9 +314,10 @@ public void deleteSnapshots(
313314
Collection<SnapshotId> snapshotIds,
314315
long repositoryStateId,
315316
Version repositoryMetaVersion,
317+
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
316318
ActionListener<RepositoryData> listener
317319
) {
318-
super.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener);
320+
super.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, remoteStoreLockManagerFactory, listener);
319321
}
320322

321323
@Override

server/src/main/java/org/opensearch/repositories/FilterRepository.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.opensearch.index.shard.ShardId;
4747
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
4848
import org.opensearch.index.store.Store;
49+
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
4950
import org.opensearch.indices.recovery.RecoveryState;
5051
import org.opensearch.snapshots.SnapshotId;
5152
import org.opensearch.snapshots.SnapshotInfo;
@@ -120,9 +121,10 @@ public void deleteSnapshots(
120121
Collection<SnapshotId> snapshotIds,
121122
long repositoryStateId,
122123
Version repositoryMetaVersion,
124+
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
123125
ActionListener<RepositoryData> listener
124126
) {
125-
in.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener);
127+
in.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, remoteStoreLockManagerFactory, listener);
126128
}
127129

128130
@Override

server/src/main/java/org/opensearch/repositories/Repository.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.opensearch.index.shard.ShardId;
4848
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
4949
import org.opensearch.index.store.Store;
50+
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
5051
import org.opensearch.indices.recovery.RecoveryState;
5152
import org.opensearch.snapshots.SnapshotId;
5253
import org.opensearch.snapshots.SnapshotInfo;
@@ -164,6 +165,7 @@ void deleteSnapshots(
164165
Collection<SnapshotId> snapshotIds,
165166
long repositoryStateId,
166167
Version repositoryMetaVersion,
168+
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
167169
ActionListener<RepositoryData> listener
168170
);
169171

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 78 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@
111111
import org.opensearch.index.snapshots.blobstore.SnapshotFiles;
112112
import org.opensearch.index.store.Store;
113113
import org.opensearch.index.store.StoreFileMetadata;
114+
import org.opensearch.index.store.lockmanager.FileLockInfo;
115+
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
116+
import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
114117
import org.opensearch.indices.recovery.RecoverySettings;
115118
import org.opensearch.indices.recovery.RecoveryState;
116119
import org.opensearch.repositories.IndexId;
@@ -718,6 +721,7 @@ public void deleteSnapshots(
718721
Collection<SnapshotId> snapshotIds,
719722
long repositoryStateId,
720723
Version repositoryMetaVersion,
724+
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
721725
ActionListener<RepositoryData> listener
722726
) {
723727
if (isReadOnly()) {
@@ -738,6 +742,7 @@ protected void doRun() throws Exception {
738742
rootBlobs,
739743
repositoryData,
740744
repositoryMetaVersion,
745+
remoteStoreLockManagerFactory,
741746
listener
742747
);
743748
}
@@ -817,6 +822,7 @@ private void doDeleteShardSnapshots(
817822
Map<String, BlobMetadata> rootBlobs,
818823
RepositoryData repositoryData,
819824
Version repoMetaVersion,
825+
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
820826
ActionListener<RepositoryData> listener
821827
) {
822828
// First write the new shard state metadata (with the removed snapshot) and compute deletion targets
@@ -851,6 +857,7 @@ private void doDeleteShardSnapshots(
851857
ActionListener.wrap(() -> listener.onResponse(updatedRepoData)),
852858
2
853859
);
860+
releaseResourceLockFiles(snapshotIds, repositoryData, remoteStoreLockManagerFactory, afterCleanupsListener);
854861
cleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
855862
asyncCleanupUnlinkedShardLevelBlobs(
856863
repositoryData,
@@ -1012,17 +1019,27 @@ protected void doRun() throws Exception {
10121019
final Set<String> blobs = shardContainer.listBlobs().keySet();
10131020
final BlobStoreIndexShardSnapshots blobStoreIndexShardSnapshots;
10141021
final long newGen;
1015-
if (useUUIDs) {
1016-
newGen = -1L;
1017-
blobStoreIndexShardSnapshots = buildBlobStoreIndexShardSnapshots(
1018-
blobs,
1019-
shardContainer,
1020-
oldRepositoryData.shardGenerations().getShardGen(indexId, finalShardId)
1021-
).v1();
1022+
1023+
// Index- file would be present if snapshots other than shallow snapshots are present for this shard
1024+
if (shardContainer.listBlobsByPrefix(SNAPSHOT_INDEX_PREFIX).size() > 0) {
1025+
if (useUUIDs) {
1026+
newGen = -1L;
1027+
blobStoreIndexShardSnapshots = buildBlobStoreIndexShardSnapshots(
1028+
blobs,
1029+
shardContainer,
1030+
oldRepositoryData.shardGenerations().getShardGen(indexId, finalShardId)
1031+
).v1();
1032+
} else {
1033+
Tuple<BlobStoreIndexShardSnapshots, Long> tuple = buildBlobStoreIndexShardSnapshots(
1034+
blobs,
1035+
shardContainer
1036+
);
1037+
newGen = tuple.v2() + 1;
1038+
blobStoreIndexShardSnapshots = tuple.v1();
1039+
}
10221040
} else {
1023-
Tuple<BlobStoreIndexShardSnapshots, Long> tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
1024-
newGen = tuple.v2() + 1;
1025-
blobStoreIndexShardSnapshots = tuple.v1();
1041+
newGen = -1L;
1042+
blobStoreIndexShardSnapshots = BlobStoreIndexShardSnapshots.EMPTY;
10261043
}
10271044
allShardsListener.onResponse(
10281045
deleteFromShardSnapshotMeta(
@@ -1059,6 +1076,42 @@ public void onFailure(Exception ex) {
10591076
}
10601077
}
10611078

1079+
private void releaseResourceLockFiles(
1080+
Collection<SnapshotId> snapshotIds,
1081+
RepositoryData repositoryData,
1082+
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
1083+
ActionListener<Void> onAllShardsCompleted
1084+
) {
1085+
for (SnapshotId snapshotId : snapshotIds) {
1086+
List<String> indices = this.getSnapshotInfo(snapshotId).indices();
1087+
List<IndexId> indexIds = repositoryData.resolveIndices(indices);
1088+
for (IndexId indexId : indexIds) {
1089+
try {
1090+
IndexMetadata indexMetadata = this.getSnapshotIndexMetaData(repositoryData, snapshotId, indexId);
1091+
if (this.getSnapshotInfo(snapshotId).isRemoteStoreIndexShallowCopyEnabled()
1092+
&& indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false)) {
1093+
int numberOfShards = indexMetadata.getNumberOfShards();
1094+
for (int shardId = 0; shardId < numberOfShards; shardId++) {
1095+
final int finalShardId = shardId;
1096+
String indexUUID = indexMetadata.getIndexUUID();
1097+
String remoteStoreRepoForIndex = indexMetadata.getSettings().get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY);
1098+
RemoteStoreMetadataLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager(
1099+
remoteStoreRepoForIndex,
1100+
indexUUID,
1101+
String.valueOf(finalShardId)
1102+
);
1103+
remoteStoreMetadataLockManager.release(
1104+
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotId.getUUID()).build()
1105+
);
1106+
}
1107+
}
1108+
} catch (IOException e) {
1109+
onAllShardsCompleted.onFailure(e);
1110+
}
1111+
}
1112+
}
1113+
}
1114+
10621115
private List<String> resolveFilesToDelete(
10631116
RepositoryData oldRepositoryData,
10641117
Collection<SnapshotId> snapshotIds,
@@ -2783,16 +2836,24 @@ private ShardSnapshotMetaDeleteResult deleteFromShardSnapshotMeta(
27832836
}
27842837
String writtenGeneration = null;
27852838
try {
2786-
if (newSnapshotsList.isEmpty()) {
2839+
// Using survivingSnapshots instead of newSnapshotsList as shallow snapshots can be present which won't be part of
2840+
// newSnapshotsList
2841+
if (survivingSnapshots.isEmpty()) {
27872842
return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId, ShardGenerations.DELETED_SHARD_GEN, blobs);
27882843
} else {
2789-
final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
2790-
if (indexGeneration < 0L) {
2791-
writtenGeneration = UUIDs.randomBase64UUID();
2792-
INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration, compress);
2844+
final BlobStoreIndexShardSnapshots updatedSnapshots;
2845+
// If we have surviving non shallow snapshots, update index- file.
2846+
if (newSnapshotsList.size() > 0) {
2847+
updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
2848+
if (indexGeneration < 0L) {
2849+
writtenGeneration = UUIDs.randomBase64UUID();
2850+
INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration, compress);
2851+
} else {
2852+
writtenGeneration = String.valueOf(indexGeneration);
2853+
writeShardIndexBlobAtomic(shardContainer, indexGeneration, updatedSnapshots);
2854+
}
27932855
} else {
2794-
writtenGeneration = String.valueOf(indexGeneration);
2795-
writeShardIndexBlobAtomic(shardContainer, indexGeneration, updatedSnapshots);
2856+
updatedSnapshots = BlobStoreIndexShardSnapshots.EMPTY;
27962857
}
27972858
final Set<String> survivingSnapshotUUIDs = survivingSnapshots.stream().map(SnapshotId::getUUID).collect(Collectors.toSet());
27982859
return new ShardSnapshotMetaDeleteResult(

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import org.opensearch.common.unit.TimeValue;
9191
import org.opensearch.index.Index;
9292
import org.opensearch.index.shard.ShardId;
93+
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
9394
import org.opensearch.repositories.IndexId;
9495
import org.opensearch.repositories.RepositoriesService;
9596
import org.opensearch.repositories.Repository;
@@ -150,6 +151,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
150151

151152
private final RepositoriesService repositoriesService;
152153

154+
private final RemoteStoreLockManagerFactory remoteStoreLockManagerFactory;
155+
153156
private final ThreadPool threadPool;
154157

155158
private final Map<Snapshot, List<ActionListener<Tuple<RepositoryData, SnapshotInfo>>>> snapshotCompletionListeners =
@@ -205,6 +208,7 @@ public SnapshotsService(
205208
this.clusterService = clusterService;
206209
this.indexNameExpressionResolver = indexNameExpressionResolver;
207210
this.repositoriesService = repositoriesService;
211+
this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService);
208212
this.threadPool = transportService.getThreadPool();
209213
this.transportService = transportService;
210214

@@ -1532,7 +1536,12 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
15321536
if (deletionToRun == null) {
15331537
runNextQueuedOperation(repositoryData, repository, false);
15341538
} else {
1535-
deleteSnapshotsFromRepository(deletionToRun, repositoryData, newState.nodes().getMinNodeVersion());
1539+
deleteSnapshotsFromRepository(
1540+
deletionToRun,
1541+
repositoryData,
1542+
newState.nodes().getMinNodeVersion(),
1543+
remoteStoreLockManagerFactory
1544+
);
15361545
}
15371546
}
15381547
});
@@ -2057,7 +2066,12 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
20572066
}
20582067
if (newDelete.state() == SnapshotDeletionsInProgress.State.STARTED) {
20592068
if (tryEnterRepoLoop(repoName)) {
2060-
deleteSnapshotsFromRepository(newDelete, repositoryData, newState.nodes().getMinNodeVersion());
2069+
deleteSnapshotsFromRepository(
2070+
newDelete,
2071+
repositoryData,
2072+
newState.nodes().getMinNodeVersion(),
2073+
remoteStoreLockManagerFactory
2074+
);
20612075
} else {
20622076
logger.trace("Delete [{}] could not execute directly and was queued", newDelete);
20632077
}
@@ -2134,7 +2148,7 @@ public void onResponse(RepositoryData repositoryData) {
21342148
+ "] in cluster state and ["
21352149
+ repositoryData.getGenId()
21362150
+ "] in the repository";
2137-
deleteSnapshotsFromRepository(deleteEntry, repositoryData, minNodeVersion);
2151+
deleteSnapshotsFromRepository(deleteEntry, repositoryData, minNodeVersion, remoteStoreLockManagerFactory);
21382152
}
21392153

21402154
@Override
@@ -2156,17 +2170,20 @@ public void onFailure(Exception e) {
21562170
private void deleteSnapshotsFromRepository(
21572171
SnapshotDeletionsInProgress.Entry deleteEntry,
21582172
RepositoryData repositoryData,
2159-
Version minNodeVersion
2173+
Version minNodeVersion,
2174+
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory
21602175
) {
21612176
if (repositoryOperations.startDeletion(deleteEntry.uuid())) {
21622177
assert currentlyFinalizing.contains(deleteEntry.repository());
21632178
final List<SnapshotId> snapshotIds = deleteEntry.getSnapshots();
21642179
assert deleteEntry.state() == SnapshotDeletionsInProgress.State.STARTED : "incorrect state for entry [" + deleteEntry + "]";
2180+
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService);
21652181
repositoriesService.repository(deleteEntry.repository())
21662182
.deleteSnapshots(
21672183
snapshotIds,
21682184
repositoryData.getGenId(),
21692185
minCompatibleVersion(minNodeVersion, repositoryData, snapshotIds),
2186+
remoteStoreLockManagerFactory,
21702187
ActionListener.wrap(updatedRepoData -> {
21712188
logger.info("snapshots {} deleted", snapshotIds);
21722189
removeSnapshotDeletionFromClusterState(deleteEntry, null, updatedRepoData);
@@ -2321,7 +2338,12 @@ public final void clusterStateProcessed(String source, ClusterState oldState, Cl
23212338
leaveRepoLoop(deleteEntry.repository());
23222339
} else {
23232340
for (SnapshotDeletionsInProgress.Entry readyDeletion : readyDeletions) {
2324-
deleteSnapshotsFromRepository(readyDeletion, repositoryData, newState.nodes().getMinNodeVersion());
2341+
deleteSnapshotsFromRepository(
2342+
readyDeletion,
2343+
repositoryData,
2344+
newState.nodes().getMinNodeVersion(),
2345+
remoteStoreLockManagerFactory
2346+
);
23252347
}
23262348
}
23272349
} else {

server/src/test/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ private SnapshotsInProgress.Entry createEntry(String dataStreamName, String repo
162162
return new SnapshotsInProgress.Entry(
163163
new Snapshot(repo, new SnapshotId("", "")),
164164
false,
165+
false,
165166
partial,
166167
SnapshotsInProgress.State.SUCCESS,
167168
Collections.emptyList(),

server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ public void testDeleteSnapshotting() {
103103
snapshot,
104104
true,
105105
false,
106+
false,
106107
SnapshotsInProgress.State.INIT,
107108
singletonList(new IndexId(index, "doesn't matter")),
108109
Collections.emptyList(),

server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexStateServiceTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,7 @@ private static ClusterState addSnapshotIndex(final String index, final int numSh
431431
snapshot,
432432
randomBoolean(),
433433
false,
434+
false,
434435
SnapshotsInProgress.State.INIT,
435436
Collections.singletonList(new IndexId(index, index)),
436437
Collections.emptyList(),

server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.opensearch.index.shard.ShardId;
6060
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
6161
import org.opensearch.index.store.Store;
62+
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
6263
import org.opensearch.indices.recovery.RecoverySettings;
6364
import org.opensearch.indices.recovery.RecoveryState;
6465
import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository;
@@ -264,6 +265,7 @@ public void deleteSnapshots(
264265
Collection<SnapshotId> snapshotIds,
265266
long repositoryStateId,
266267
Version repositoryMetaVersion,
268+
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
267269
ActionListener<RepositoryData> listener
268270
) {
269271
listener.onResponse(null);

test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.opensearch.index.mapper.MapperService;
4545
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
4646
import org.opensearch.index.store.Store;
47+
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
4748
import org.opensearch.repositories.IndexId;
4849
import org.opensearch.repositories.IndexMetaDataGenerations;
4950
import org.opensearch.repositories.Repository;
@@ -133,6 +134,7 @@ public void deleteSnapshots(
133134
Collection<SnapshotId> snapshotIds,
134135
long repositoryStateId,
135136
Version repositoryMetaVersion,
137+
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
136138
ActionListener<RepositoryData> listener
137139
) {
138140
listener.onResponse(null);

0 commit comments

Comments
 (0)