Skip to content

Commit 479ba28

Browse files
author
Harish Bhakuni
committed
[Snapshot Interop] Add Changes in Create Snapshot Flow for remote store interoperability.
Signed-off-by: Harish Bhakuni <hbhakuni@amazon.com>
1 parent 96f2ffa commit 479ba28

File tree

26 files changed

+820
-93
lines changed

26 files changed

+820
-93
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
109109
### Changed
110110
- Enable `./gradlew build` on MacOS by disabling bcw tests ([#7303](https://github.com/opensearch-project/OpenSearch/pull/7303))
111111
- Moved concurrent-search from sandbox plugin to server module behind feature flag ([#7203](https://github.com/opensearch-project/OpenSearch/pull/7203))
112+
- [Snapshot Interop] Add Changes in Create Snapshot Flow for remote store interoperability. ([#7118](https://github.com/opensearch-project/OpenSearch/pull/7118))
112113

113114
### Deprecated
114115

@@ -120,4 +121,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
120121
### Security
121122

122123
[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
123-
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.7...2.x
124+
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.7...2.x

server/src/internalClusterTest/java/org/opensearch/cluster/ClusterStateDiffIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -776,7 +776,8 @@ public ClusterState.Custom randomCreate(String name) {
776776
Map.of(),
777777
null,
778778
SnapshotInfoTests.randomUserMetadata(),
779-
randomVersion(random())
779+
randomVersion(random()),
780+
false
780781
)
781782
)
782783
);

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotRequest.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import org.opensearch.OpenSearchException;
3636
import org.opensearch.OpenSearchGenerationException;
37+
import org.opensearch.Version;
3738
import org.opensearch.action.ActionRequestValidationException;
3839
import org.opensearch.action.IndicesRequest;
3940
import org.opensearch.action.support.IndicesOptions;
@@ -101,6 +102,10 @@ public class CreateSnapshotRequest extends ClusterManagerNodeRequest<CreateSnaps
101102

102103
private Map<String, Object> userMetadata;
103104

105+
private Boolean remoteStoreIndexShallowCopy;
106+
107+
private static final String REMOTE_STORE_INDEX_SHALLOW_COPY = "remote_store_index_shallow_copy";
108+
104109
public CreateSnapshotRequest() {}
105110

106111
/**
@@ -125,6 +130,9 @@ public CreateSnapshotRequest(StreamInput in) throws IOException {
125130
waitForCompletion = in.readBoolean();
126131
partial = in.readBoolean();
127132
userMetadata = in.readMap();
133+
if (in.getVersion().onOrAfter(Version.V_2_8_0)) {
134+
remoteStoreIndexShallowCopy = in.readOptionalBoolean();
135+
}
128136
}
129137

130138
@Override
@@ -139,6 +147,9 @@ public void writeTo(StreamOutput out) throws IOException {
139147
out.writeBoolean(waitForCompletion);
140148
out.writeBoolean(partial);
141149
out.writeMap(userMetadata);
150+
if (out.getVersion().onOrAfter(Version.V_2_8_0)) {
151+
out.writeOptionalBoolean(remoteStoreIndexShallowCopy);
152+
}
142153
}
143154

144155
@Override
@@ -328,6 +339,11 @@ public CreateSnapshotRequest waitForCompletion(boolean waitForCompletion) {
328339
return this;
329340
}
330341

342+
public CreateSnapshotRequest remoteStoreIndexShallowCopy(boolean remoteStoreIndexShallowCopy) {
343+
this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy;
344+
return this;
345+
}
346+
331347
/**
332348
* Returns true if the request should wait for the snapshot completion before returning
333349
*
@@ -429,6 +445,10 @@ public Map<String, Object> userMetadata() {
429445
return userMetadata;
430446
}
431447

448+
public Boolean remoteStoreIndexShallowCopy() {
449+
return remoteStoreIndexShallowCopy;
450+
}
451+
432452
public CreateSnapshotRequest userMetadata(Map<String, Object> userMetadata) {
433453
this.userMetadata = userMetadata;
434454
return this;
@@ -466,6 +486,8 @@ public CreateSnapshotRequest source(Map<String, Object> source) {
466486
throw new IllegalArgumentException("malformed metadata, should be an object");
467487
}
468488
userMetadata((Map<String, Object>) entry.getValue());
489+
} else if (name.equals(REMOTE_STORE_INDEX_SHALLOW_COPY)) {
490+
remoteStoreIndexShallowCopy = nodeBooleanValue(entry.getValue(), REMOTE_STORE_INDEX_SHALLOW_COPY);
469491
}
470492
}
471493
indicesOptions(IndicesOptions.fromMap(source, indicesOptions));
@@ -495,6 +517,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
495517
indicesOptions.toXContent(builder, params);
496518
}
497519
builder.field("metadata", userMetadata);
520+
builder.field(REMOTE_STORE_INDEX_SHALLOW_COPY, remoteStoreIndexShallowCopy);
498521
builder.endObject();
499522
return builder;
500523
}
@@ -518,7 +541,8 @@ public boolean equals(Object o) {
518541
&& Objects.equals(indicesOptions, that.indicesOptions)
519542
&& Objects.equals(settings, that.settings)
520543
&& Objects.equals(clusterManagerNodeTimeout, that.clusterManagerNodeTimeout)
521-
&& Objects.equals(userMetadata, that.userMetadata);
544+
&& Objects.equals(userMetadata, that.userMetadata)
545+
&& Objects.equals(remoteStoreIndexShallowCopy, that.remoteStoreIndexShallowCopy);
522546
}
523547

524548
@Override
@@ -562,6 +586,8 @@ public String toString() {
562586
+ clusterManagerNodeTimeout
563587
+ ", metadata="
564588
+ userMetadata
589+
+ ", remoteStoreIndexShallowCopy="
590+
+ remoteStoreIndexShallowCopy
565591
+ '}';
566592
}
567593
}

server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ public static Entry startedEntry(
113113
long repositoryStateId,
114114
final Map<ShardId, ShardSnapshotStatus> shards,
115115
Map<String, Object> userMetadata,
116-
Version version
116+
Version version,
117+
boolean remoteStoreIndexShallowCopy
117118
) {
118119
return new SnapshotsInProgress.Entry(
119120
snapshot,
@@ -127,7 +128,8 @@ public static Entry startedEntry(
127128
shards,
128129
null,
129130
userMetadata,
130-
version
131+
version,
132+
remoteStoreIndexShallowCopy
131133
);
132134
}
133135

@@ -164,7 +166,8 @@ public static Entry startClone(
164166
Collections.emptyMap(),
165167
version,
166168
source,
167-
Map.of()
169+
Map.of(),
170+
false // TODO: need to pull this value from the original snapshot, use whatever we set during snapshot create.
168171
);
169172
}
170173

@@ -177,6 +180,7 @@ public static class Entry implements Writeable, ToXContent, RepositoryOperation
177180
private final State state;
178181
private final Snapshot snapshot;
179182
private final boolean includeGlobalState;
183+
private final boolean remoteStoreIndexShallowCopy;
180184
private final boolean partial;
181185
/**
182186
* Map of {@link ShardId} to {@link ShardSnapshotStatus} tracking the state of each shard snapshot operation.
@@ -219,7 +223,8 @@ public Entry(
219223
final Map<ShardId, ShardSnapshotStatus> shards,
220224
String failure,
221225
Map<String, Object> userMetadata,
222-
Version version
226+
Version version,
227+
boolean remoteStoreIndexShallowCopy
223228
) {
224229
this(
225230
snapshot,
@@ -235,7 +240,8 @@ public Entry(
235240
userMetadata,
236241
version,
237242
null,
238-
Map.of()
243+
Map.of(),
244+
remoteStoreIndexShallowCopy
239245
);
240246
}
241247

@@ -253,7 +259,8 @@ private Entry(
253259
final Map<String, Object> userMetadata,
254260
Version version,
255261
@Nullable SnapshotId source,
256-
@Nullable final Map<RepositoryShardId, ShardSnapshotStatus> clones
262+
@Nullable final Map<RepositoryShardId, ShardSnapshotStatus> clones,
263+
boolean remoteStoreIndexShallowCopy
257264
) {
258265
this.state = state;
259266
this.snapshot = snapshot;
@@ -274,6 +281,7 @@ private Entry(
274281
} else {
275282
this.clones = Collections.unmodifiableMap(clones);
276283
}
284+
this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy;
277285
assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones);
278286
}
279287

@@ -292,6 +300,11 @@ private Entry(StreamInput in) throws IOException {
292300
dataStreams = in.readStringList();
293301
source = in.readOptionalWriteable(SnapshotId::new);
294302
clones = in.readMap(RepositoryShardId::new, ShardSnapshotStatus::readFrom);
303+
if (in.getVersion().onOrAfter(Version.V_2_8_0)) {
304+
remoteStoreIndexShallowCopy = in.readBoolean();
305+
} else {
306+
remoteStoreIndexShallowCopy = false;
307+
}
295308
}
296309

297310
private static boolean assertShardsConsistent(
@@ -346,7 +359,8 @@ public Entry(
346359
long repositoryStateId,
347360
final Map<ShardId, ShardSnapshotStatus> shards,
348361
Map<String, Object> userMetadata,
349-
Version version
362+
Version version,
363+
boolean remoteStoreIndexShallowCopy
350364
) {
351365
this(
352366
snapshot,
@@ -360,7 +374,8 @@ public Entry(
360374
shards,
361375
null,
362376
userMetadata,
363-
version
377+
version,
378+
remoteStoreIndexShallowCopy
364379
);
365380
}
366381

@@ -385,7 +400,8 @@ public Entry(
385400
shards,
386401
failure,
387402
entry.userMetadata,
388-
version
403+
version,
404+
entry.remoteStoreIndexShallowCopy
389405
);
390406
}
391407

@@ -409,7 +425,8 @@ public Entry withRepoGen(long newRepoGen) {
409425
userMetadata,
410426
version,
411427
source,
412-
clones
428+
clones,
429+
remoteStoreIndexShallowCopy
413430
);
414431
}
415432

@@ -431,7 +448,8 @@ public Entry withClones(final Map<RepositoryShardId, ShardSnapshotStatus> update
431448
userMetadata,
432449
version,
433450
source,
434-
updatedClones
451+
updatedClones,
452+
remoteStoreIndexShallowCopy
435453
);
436454
}
437455

@@ -486,7 +504,8 @@ public Entry fail(final Map<ShardId, ShardSnapshotStatus> shards, State state, S
486504
userMetadata,
487505
version,
488506
source,
489-
clones
507+
clones,
508+
remoteStoreIndexShallowCopy
490509
);
491510
}
492511

@@ -512,7 +531,8 @@ public Entry withShardStates(final Map<ShardId, ShardSnapshotStatus> shards) {
512531
shards,
513532
failure,
514533
userMetadata,
515-
version
534+
version,
535+
remoteStoreIndexShallowCopy
516536
);
517537
}
518538
return withStartedShards(shards);
@@ -535,7 +555,8 @@ public Entry withStartedShards(final Map<ShardId, ShardSnapshotStatus> shards) {
535555
shards,
536556
failure,
537557
userMetadata,
538-
version
558+
version,
559+
remoteStoreIndexShallowCopy
539560
);
540561
assert updated.state().completed() == false && completed(updated.shards().values()) == false
541562
: "Only running snapshots allowed but saw [" + updated + "]";
@@ -567,6 +588,10 @@ public boolean includeGlobalState() {
567588
return includeGlobalState;
568589
}
569590

591+
public boolean remoteStoreIndexShallowCopy() {
592+
return remoteStoreIndexShallowCopy;
593+
}
594+
570595
public Map<String, Object> userMetadata() {
571596
return userMetadata;
572597
}
@@ -630,7 +655,7 @@ public boolean equals(Object o) {
630655
if (version.equals(entry.version) == false) return false;
631656
if (Objects.equals(source, ((Entry) o).source) == false) return false;
632657
if (clones.equals(((Entry) o).clones) == false) return false;
633-
658+
if (remoteStoreIndexShallowCopy != entry.remoteStoreIndexShallowCopy) return false;
634659
return true;
635660
}
636661

@@ -647,6 +672,7 @@ public int hashCode() {
647672
result = 31 * result + version.hashCode();
648673
result = 31 * result + (source == null ? 0 : source.hashCode());
649674
result = 31 * result + clones.hashCode();
675+
result = 31 * result + (remoteStoreIndexShallowCopy ? 1 : 0);
650676
return result;
651677
}
652678

@@ -710,6 +736,9 @@ public void writeTo(StreamOutput out) throws IOException {
710736
out.writeStringCollection(dataStreams);
711737
out.writeOptionalWriteable(source);
712738
out.writeMap(clones, (o, v) -> v.writeTo(o), (o, v) -> v.writeTo(o));
739+
if (out.getVersion().onOrAfter(Version.V_2_8_0)) {
740+
out.writeBoolean(remoteStoreIndexShallowCopy);
741+
}
713742
}
714743

715744
@Override

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1473,6 +1473,12 @@ public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) th
14731473
}
14741474
}
14751475

1476+
public GatedCloseable<IndexCommit> acquireLastIndexCommitAndRefresh(boolean flushFirst) throws EngineException {
1477+
GatedCloseable<IndexCommit> indexCommit = acquireLastIndexCommit(flushFirst);
1478+
getEngine().refresh("Snapshot for Remote Store based Shard");
1479+
return indexCommit;
1480+
}
1481+
14761482
public Optional<NRTReplicationEngine> getReplicationEngine() {
14771483
if (getEngine() instanceof NRTReplicationEngine) {
14781484
return Optional.of((NRTReplicationEngine) getEngine());

0 commit comments

Comments
 (0)