Skip to content

Commit b642e69

Browse files
[Backport 2.x] [Snapshot Interop] Add Changes in Create Snapshot Flow for remote store interoperability. (#8071)
Signed-off-by: Harish Bhakuni <hbhakuni@amazon.com>
1 parent d868857 commit b642e69

26 files changed

+1541
-109
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4343
- Add new query profile collector fields with concurrent search execution ([#7898](https://github.com/opensearch-project/OpenSearch/pull/7898))
4444
- Align range and default value for deletes_pct_allowed in merge policy ([#7730](https://github.com/opensearch-project/OpenSearch/pull/7730))
4545
- Rename QueryPhase actors like Suggest, Rescore to be processors rather than phase ([#8025](https://github.com/opensearch-project/OpenSearch/pull/8025))
46+
- [Snapshot Interop] Add Changes in Create Snapshot Flow for remote store interoperability. ([#8071](https://github.com/opensearch-project/OpenSearch/pull/8071))
4647

4748
### Deprecated
4849

server/src/internalClusterTest/java/org/opensearch/cluster/ClusterStateDiffIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -776,7 +776,8 @@ public ClusterState.Custom randomCreate(String name) {
776776
ImmutableOpenMap.of(),
777777
null,
778778
SnapshotInfoTests.randomUserMetadata(),
779-
randomVersion(random())
779+
randomVersion(random()),
780+
false
780781
)
781782
)
782783
);

server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ public static Entry startedEntry(
123123
long repositoryStateId,
124124
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
125125
Map<String, Object> userMetadata,
126-
Version version
126+
Version version,
127+
boolean remoteStoreIndexShallowCopy
127128
) {
128129
return new SnapshotsInProgress.Entry(
129130
snapshot,
@@ -137,7 +138,8 @@ public static Entry startedEntry(
137138
shards,
138139
null,
139140
userMetadata,
140-
version
141+
version,
142+
remoteStoreIndexShallowCopy
141143
);
142144
}
143145

@@ -174,7 +176,8 @@ public static Entry startClone(
174176
Collections.emptyMap(),
175177
version,
176178
source,
177-
ImmutableOpenMap.of()
179+
ImmutableOpenMap.of(),
180+
false // TODO: need to pull this value from the original snapshot, use whatever we set during snapshot create.
178181
);
179182
}
180183

@@ -187,6 +190,7 @@ public static class Entry implements Writeable, ToXContent, RepositoryOperation
187190
private final State state;
188191
private final Snapshot snapshot;
189192
private final boolean includeGlobalState;
193+
private final boolean remoteStoreIndexShallowCopy;
190194
private final boolean partial;
191195
/**
192196
* Map of {@link ShardId} to {@link ShardSnapshotStatus} tracking the state of each shard snapshot operation.
@@ -229,7 +233,8 @@ public Entry(
229233
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
230234
String failure,
231235
Map<String, Object> userMetadata,
232-
Version version
236+
Version version,
237+
boolean remoteStoreIndexShallowCopy
233238
) {
234239
this(
235240
snapshot,
@@ -245,7 +250,8 @@ public Entry(
245250
userMetadata,
246251
version,
247252
null,
248-
ImmutableOpenMap.of()
253+
ImmutableOpenMap.of(),
254+
remoteStoreIndexShallowCopy
249255
);
250256
}
251257

@@ -263,7 +269,8 @@ private Entry(
263269
Map<String, Object> userMetadata,
264270
Version version,
265271
@Nullable SnapshotId source,
266-
@Nullable ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones
272+
@Nullable ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones,
273+
boolean remoteStoreIndexShallowCopy
267274
) {
268275
this.state = state;
269276
this.snapshot = snapshot;
@@ -284,6 +291,7 @@ private Entry(
284291
} else {
285292
this.clones = clones;
286293
}
294+
this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy;
287295
assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones);
288296
}
289297

@@ -324,6 +332,11 @@ private Entry(StreamInput in) throws IOException {
324332
source = null;
325333
clones = ImmutableOpenMap.of();
326334
}
335+
if (in.getVersion().onOrAfter(Version.V_2_9_0)) {
336+
remoteStoreIndexShallowCopy = in.readBoolean();
337+
} else {
338+
remoteStoreIndexShallowCopy = false;
339+
}
327340
}
328341

329342
private static boolean assertShardsConsistent(
@@ -378,7 +391,8 @@ public Entry(
378391
long repositoryStateId,
379392
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
380393
Map<String, Object> userMetadata,
381-
Version version
394+
Version version,
395+
boolean remoteStoreIndexShallowCopy
382396
) {
383397
this(
384398
snapshot,
@@ -392,7 +406,8 @@ public Entry(
392406
shards,
393407
null,
394408
userMetadata,
395-
version
409+
version,
410+
remoteStoreIndexShallowCopy
396411
);
397412
}
398413

@@ -417,7 +432,8 @@ public Entry(
417432
shards,
418433
failure,
419434
entry.userMetadata,
420-
version
435+
version,
436+
entry.remoteStoreIndexShallowCopy
421437
);
422438
}
423439

@@ -441,7 +457,8 @@ public Entry withRepoGen(long newRepoGen) {
441457
userMetadata,
442458
version,
443459
source,
444-
clones
460+
clones,
461+
remoteStoreIndexShallowCopy
445462
);
446463
}
447464

@@ -463,7 +480,8 @@ public Entry withClones(ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus>
463480
userMetadata,
464481
version,
465482
source,
466-
updatedClones
483+
updatedClones,
484+
remoteStoreIndexShallowCopy
467485
);
468486
}
469487

@@ -518,7 +536,8 @@ public Entry fail(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, State s
518536
userMetadata,
519537
version,
520538
source,
521-
clones
539+
clones,
540+
remoteStoreIndexShallowCopy
522541
);
523542
}
524543

@@ -544,7 +563,8 @@ public Entry withShardStates(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shar
544563
shards,
545564
failure,
546565
userMetadata,
547-
version
566+
version,
567+
remoteStoreIndexShallowCopy
548568
);
549569
}
550570
return withStartedShards(shards);
@@ -567,7 +587,8 @@ public Entry withStartedShards(ImmutableOpenMap<ShardId, ShardSnapshotStatus> sh
567587
shards,
568588
failure,
569589
userMetadata,
570-
version
590+
version,
591+
remoteStoreIndexShallowCopy
571592
);
572593
assert updated.state().completed() == false && completed(updated.shards().values()) == false
573594
: "Only running snapshots allowed but saw [" + updated + "]";
@@ -599,6 +620,10 @@ public boolean includeGlobalState() {
599620
return includeGlobalState;
600621
}
601622

623+
public boolean remoteStoreIndexShallowCopy() {
624+
return remoteStoreIndexShallowCopy;
625+
}
626+
602627
public Map<String, Object> userMetadata() {
603628
return userMetadata;
604629
}
@@ -662,7 +687,7 @@ public boolean equals(Object o) {
662687
if (version.equals(entry.version) == false) return false;
663688
if (Objects.equals(source, ((Entry) o).source) == false) return false;
664689
if (clones.equals(((Entry) o).clones) == false) return false;
665-
690+
if (remoteStoreIndexShallowCopy != entry.remoteStoreIndexShallowCopy) return false;
666691
return true;
667692
}
668693

@@ -679,6 +704,7 @@ public int hashCode() {
679704
result = 31 * result + version.hashCode();
680705
result = 31 * result + (source == null ? 0 : source.hashCode());
681706
result = 31 * result + clones.hashCode();
707+
result = 31 * result + (remoteStoreIndexShallowCopy ? 1 : 0);
682708
return result;
683709
}
684710

@@ -752,6 +778,9 @@ public void writeTo(StreamOutput out) throws IOException {
752778
out.writeOptionalWriteable(source);
753779
out.writeMap(clones);
754780
}
781+
if (out.getVersion().onOrAfter(Version.V_2_9_0)) {
782+
out.writeBoolean(remoteStoreIndexShallowCopy);
783+
}
755784
}
756785

757786
@Override

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1486,6 +1486,46 @@ public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) th
14861486
}
14871487
}
14881488

1489+
public GatedCloseable<IndexCommit> acquireLastIndexCommitAndRefresh(boolean flushFirst) throws EngineException {
1490+
GatedCloseable<IndexCommit> indexCommit = acquireLastIndexCommit(flushFirst);
1491+
getEngine().refresh("Snapshot for Remote Store based Shard");
1492+
return indexCommit;
1493+
}
1494+
1495+
/**
1496+
*
1497+
* @param snapshotId Snapshot UUID.
1498+
* @param primaryTerm current primary term.
1499+
* @param generation Snapshot Commit Generation.
1500+
* @throws IOException if there is some failure in acquiring lock in remote store.
1501+
*/
1502+
public void acquireLockOnCommitData(String snapshotId, long primaryTerm, long generation) throws IOException {
1503+
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteSegmentDirectoryForShard();
1504+
remoteSegmentStoreDirectory.acquireLock(primaryTerm, generation, snapshotId);
1505+
}
1506+
1507+
/**
1508+
*
1509+
* @param snapshotId Snapshot UUID.
1510+
* @param primaryTerm current primary term.
1511+
* @param generation Snapshot Commit Generation.
1512+
* @throws IOException if there is some failure in releasing lock in remote store.
1513+
*/
1514+
public void releaseLockOnCommitData(String snapshotId, long primaryTerm, long generation) throws IOException {
1515+
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteSegmentDirectoryForShard();
1516+
remoteSegmentStoreDirectory.releaseLock(primaryTerm, generation, snapshotId);
1517+
}
1518+
1519+
private RemoteSegmentStoreDirectory getRemoteSegmentDirectoryForShard() {
1520+
FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory();
1521+
assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory
1522+
: "Store.directory is not enclosing an instance of FilterDirectory";
1523+
FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate();
1524+
final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate();
1525+
assert remoteDirectory instanceof RemoteSegmentStoreDirectory : "remoteDirectory is not an instance of RemoteSegmentStoreDirectory";
1526+
return ((RemoteSegmentStoreDirectory) remoteDirectory);
1527+
}
1528+
14891529
public Optional<NRTReplicationEngine> getReplicationEngine() {
14901530
if (getEngine() instanceof NRTReplicationEngine) {
14911531
return Optional.of((NRTReplicationEngine) getEngine());

0 commit comments

Comments
 (0)