Skip to content

Commit 0361034

Browse files
committed
[Snapshot Interop] Add Changes in Snapshot Status Flow for remote store interoperability.
Signed-off-by: Bansi Kasundra <kasundra@amazon.com>
1 parent 4956db3 commit 0361034

File tree

8 files changed

+268
-212
lines changed

8 files changed

+268
-212
lines changed

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

Lines changed: 182 additions & 168 deletions
Large diffs are not rendered by default.

server/src/main/java/org/opensearch/repositories/FilterRepository.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,8 +195,8 @@ public void restoreShard(
195195
}
196196

197197
@Override
198-
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId) {
199-
return in.getShardSnapshotStatus(snapshotId, indexId, shardId);
198+
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId, boolean isRemoteIndexShard) {
199+
return in.getShardSnapshotStatus(snapshotId, indexId, shardId, isRemoteIndexShard);
200200
}
201201

202202
@Override

server/src/main/java/org/opensearch/repositories/Repository.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,12 @@ void restoreShard(
278278
* @param shardId shard id
279279
* @return snapshot status
280280
*/
281-
IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId);
281+
IndexShardSnapshotStatus getShardSnapshotStatus(
282+
SnapshotId snapshotId,
283+
IndexId indexId,
284+
ShardId shardId,
285+
boolean isRemoteIndexShard
286+
);
282287

283288
/**
284289
* Update the repository with the incoming cluster state. This method is invoked from {@link RepositoriesService#applyClusterState} and

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2684,17 +2684,34 @@ public InputStream maybeRateLimitSnapshots(InputStream stream) {
26842684
}
26852685

26862686
@Override
2687-
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId) {
2688-
BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(shardContainer(indexId, shardId), snapshotId);
2689-
return IndexShardSnapshotStatus.newDone(
2690-
snapshot.startTime(),
2691-
snapshot.time(),
2692-
snapshot.incrementalFileCount(),
2693-
snapshot.totalFileCount(),
2694-
snapshot.incrementalSize(),
2695-
snapshot.totalSize(),
2696-
null
2697-
); // Not adding a real generation here as it doesn't matter to callers
2687+
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId, boolean isRemoteIndexShard) {
2688+
logger.info("isRemoteIndexShard: {}", isRemoteIndexShard);
2689+
if (!isRemoteIndexShard) {
2690+
BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(shardContainer(indexId, shardId), snapshotId);
2691+
return IndexShardSnapshotStatus.newDone(
2692+
snapshot.startTime(),
2693+
snapshot.time(),
2694+
snapshot.incrementalFileCount(),
2695+
snapshot.totalFileCount(),
2696+
snapshot.incrementalSize(),
2697+
snapshot.totalSize(),
2698+
null
2699+
); // Not adding a real generation here as it doesn't matter to callers
2700+
}
2701+
else {
2702+
logger.info("Reading rem shard snapshot");
2703+
BlobStoreRemStoreBasedIndexShardSnapshot snapshot = loadRemStoreEnabledShardSnapshot(shardContainer(indexId, shardId), snapshotId);
2704+
logger.info("Done reading rem shard snapshot");
2705+
return IndexShardSnapshotStatus.newDone(
2706+
snapshot.startTime(),
2707+
snapshot.time(),
2708+
snapshot.incrementalFileCount(),
2709+
snapshot.totalFileCount(),
2710+
snapshot.incrementalSize(),
2711+
snapshot.totalSize(),
2712+
null
2713+
); // Not adding a real generation here as it doesn't matter to callers
2714+
}
26982715
}
26992716

27002717
@Override

server/src/main/java/org/opensearch/snapshots/InternalSnapshotsInfoService.java

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@
3636
import org.apache.logging.log4j.Logger;
3737
import org.apache.logging.log4j.message.ParameterizedMessage;
3838
import org.opensearch.action.ActionListener;
39+
import org.opensearch.action.StepListener;
3940
import org.opensearch.cluster.ClusterChangedEvent;
4041
import org.opensearch.cluster.ClusterState;
4142
import org.opensearch.cluster.ClusterStateListener;
43+
import org.opensearch.cluster.metadata.IndexMetadata;
4244
import org.opensearch.cluster.node.DiscoveryNode;
4345
import org.opensearch.cluster.routing.RecoverySource;
4446
import org.opensearch.cluster.routing.RerouteService;
@@ -54,6 +56,7 @@
5456
import org.opensearch.repositories.IndexId;
5557
import org.opensearch.repositories.RepositoriesService;
5658
import org.opensearch.repositories.Repository;
59+
import org.opensearch.repositories.RepositoryData;
5760
import org.opensearch.threadpool.ThreadPool;
5861

5962
import java.util.Collections;
@@ -65,6 +68,7 @@
6568
import java.util.Objects;
6669
import java.util.Queue;
6770
import java.util.Set;
71+
import java.util.function.Consumer;
6872
import java.util.function.Supplier;
6973

7074
/**
@@ -236,31 +240,47 @@ protected void doRun() throws Exception {
236240
assert repositories != null;
237241
final Repository repository = repositories.repository(snapshotShard.snapshot.getRepository());
238242

239-
logger.debug("fetching snapshot shard size for {}", snapshotShard);
240-
final long snapshotShardSize = repository.getShardSnapshotStatus(
241-
snapshotShard.snapshot().getSnapshotId(),
242-
snapshotShard.index(),
243-
snapshotShard.shardId()
244-
).asCopy().getTotalSize();
245-
246-
logger.debug("snapshot shard size for {}: {} bytes", snapshotShard, snapshotShardSize);
247-
248-
boolean updated = false;
249-
synchronized (mutex) {
250-
removed = unknownSnapshotShards.remove(snapshotShard);
251-
assert removed : "snapshot shard to remove does not exist " + snapshotShardSize;
252-
if (isMaster) {
253-
final Map<SnapshotShard, Long> newSnapshotShardSizes = new HashMap<>(knownSnapshotShards);
254-
updated = newSnapshotShardSizes.put(snapshotShard, snapshotShardSize) == null;
255-
assert updated : "snapshot shard size already exists for " + snapshotShard;
256-
knownSnapshotShards = Collections.unmodifiableMap(newSnapshotShardSizes);
243+
final Consumer<Exception> onFailure = e -> onFailure(e);
244+
245+
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
246+
repositories.getRepositoryData(snapshotShard.snapshot.getRepository(), repositoryDataListener);
247+
repositoryDataListener.whenComplete(repositoryData -> {
248+
final IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(
249+
repositoryData,
250+
snapshotShard.snapshot.getSnapshotId(),
251+
snapshotShard.index);
252+
final boolean isRemoteIndexShard = indexMetadata.getSettings().getAsBoolean(
253+
IndexMetadata.SETTING_REMOTE_STORE_ENABLED,
254+
false) &&
255+
repository.getSnapshotInfo(snapshotShard.snapshot.getSnapshotId()).isRemoteStoreInteropEnabled();
256+
257+
logger.debug("fetching snapshot shard size for {}", snapshotShard);
258+
final long snapshotShardSize = repository.getShardSnapshotStatus(
259+
snapshotShard.snapshot().getSnapshotId(),
260+
snapshotShard.index(),
261+
snapshotShard.shardId(),
262+
isRemoteIndexShard
263+
).asCopy().getTotalSize();
264+
265+
logger.debug("snapshot shard size for {}: {} bytes", snapshotShard, snapshotShardSize);
266+
267+
boolean updated = false;
268+
synchronized (mutex) {
269+
removed = unknownSnapshotShards.remove(snapshotShard);
270+
assert removed : "snapshot shard to remove does not exist " + snapshotShardSize;
271+
if (isMaster) {
272+
final Map<SnapshotShard, Long> newSnapshotShardSizes = new HashMap<>(knownSnapshotShards);
273+
updated = newSnapshotShardSizes.put(snapshotShard, snapshotShardSize) == null;
274+
assert updated : "snapshot shard size already exists for " + snapshotShard;
275+
knownSnapshotShards = Collections.unmodifiableMap(newSnapshotShardSizes);
276+
}
277+
activeFetches -= 1;
278+
assert invariant();
257279
}
258-
activeFetches -= 1;
259-
assert invariant();
260-
}
261-
if (updated) {
262-
rerouteService.get().reroute("snapshot shard size updated", Priority.HIGH, REROUTE_LISTENER);
263-
}
280+
if (updated) {
281+
rerouteService.get().reroute("snapshot shard size updated", Priority.HIGH, REROUTE_LISTENER);
282+
}
283+
}, onFailure);
264284
}
265285

266286
@Override

server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ public void restoreShard(
328328
}
329329

330330
@Override
331-
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId) {
331+
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId, boolean isRemoteIndexShard) {
332332
return null;
333333
}
334334

server/src/test/java/org/opensearch/snapshots/InternalSnapshotsInfoServiceTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public void testSnapshotShardSizes() throws Exception {
152152
final CountDownLatch latch = new CountDownLatch(1);
153153
final Repository mockRepository = new FilterRepository(mock(Repository.class)) {
154154
@Override
155-
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId) {
155+
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId, boolean isRemoteIndexShard) {
156156
try {
157157
assertThat(indexId.getName(), equalTo(indexName));
158158
assertThat(shardId.id(), allOf(greaterThanOrEqualTo(0), lessThan(numberOfShards)));
@@ -212,7 +212,7 @@ public void testErroneousSnapshotShardSizes() throws Exception {
212212
final Map<InternalSnapshotsInfoService.SnapshotShard, Long> results = new ConcurrentHashMap<>();
213213
final Repository mockRepository = new FilterRepository(mock(Repository.class)) {
214214
@Override
215-
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId) {
215+
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId, boolean isRemoteIndexShard) {
216216
final InternalSnapshotsInfoService.SnapshotShard snapshotShard = new InternalSnapshotsInfoService.SnapshotShard(
217217
new Snapshot("_repo", snapshotId),
218218
indexId,
@@ -300,7 +300,7 @@ public void testNoLongerClusterManager() throws Exception {
300300

301301
final Repository mockRepository = new FilterRepository(mock(Repository.class)) {
302302
@Override
303-
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId) {
303+
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId, boolean isRemoteIndexShard) {
304304
return IndexShardSnapshotStatus.newDone(0L, 0L, 0, 0, 0L, randomNonNegativeLong(), null);
305305
}
306306
};
@@ -336,7 +336,7 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In
336336
public void testCleanUpSnapshotShardSizes() throws Exception {
337337
final Repository mockRepository = new FilterRepository(mock(Repository.class)) {
338338
@Override
339-
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId) {
339+
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId, boolean isRemoteIndexShard) {
340340
if (randomBoolean()) {
341341
throw new SnapshotException(new Snapshot("_repo", snapshotId), "simulated");
342342
} else {

test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public void snapshotShard(
176176
) {}
177177

178178
@Override
179-
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId) {
179+
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId, boolean IsRemoteIndexShard) {
180180
return null;
181181
}
182182

0 commit comments

Comments
 (0)