Skip to content

Commit 7438886

Browse files
[Segment Replication] Allow Cluster Setting to enable default Replication type (#7420) (#7776)
* Add cluster setting to enable default replication type at cluster level. * update opensearch.yml file * Fix failing unit test. * Fix failing tests. * Apply spotlesscheck. * Address comments on PR. * Fix Failing Tests. * Fix test index name * add logic for system indices to always use document replication. * remove unnecessary logic. * fix failing tests. * fix failing tests * Refactor and add additional assert for segRepEnabled. * fix failing tests. * refactor code. * Fix failing tests. * Fix failing tests. * Fix failing tests. * remove unnecessary code logic and test * Refactor back to having boolean value of system index, * Add unit test for system index check and some refactor. * add changelog entry. --------- (cherry picked from commit a5ffddc) Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> Signed-off-by: Rishikesh Pasham <62345295+Rishikesh1159@users.noreply.github.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent af0eba4 commit 7438886

File tree

8 files changed

+310
-81
lines changed

8 files changed

+310
-81
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2525
- Added @dbwiddis as on OpenSearch maintainer ([#7665](https://github.com/opensearch-project/OpenSearch/pull/7665))
2626
- SegRep with Remote: Add hook for publishing checkpoint notifications after segment upload to remote store ([#7394](https://github.com/opensearch-project/OpenSearch/pull/7394))
2727
- [Extensions] Add ExtensionAwarePlugin extension point to add custom settings for extensions ([#7526](https://github.com/opensearch-project/OpenSearch/pull/7526))
28+
- Add new cluster setting to set default index replication type ([#7420](https://github.com/opensearch-project/OpenSearch/pull/7420))
2829

2930
### Dependencies
3031
- Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.3 (#7564)

distribution/src/config/opensearch.yml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,9 @@ ${path.logs}
102102
#
103103
# ---------------------------------- Experimental Features -----------------------------------
104104
#
105-
# Gates the visibility of the index setting that allows changing of replication type.
106-
# Once the feature is ready for production release, this feature flag can be removed.
105+
# Gates the visibility of the experimental segment replication features until they are production ready.
107106
#
108-
#opensearch.experimental.feature.replication_type.enabled: false
107+
#opensearch.experimental.feature.segment_replication_experimental.enabled: false
109108
#
110109
#
111110
# Gates the visibility of the index setting that allows persisting data to remote store along with local disk.

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationClusterSettingIT.java

Lines changed: 98 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,14 @@
88

99
package org.opensearch.indices.replication;
1010

11-
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
11+
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest;
12+
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
1213
import org.opensearch.cluster.metadata.IndexMetadata;
1314
import org.opensearch.common.settings.Settings;
1415
import org.opensearch.common.util.FeatureFlags;
16+
import org.opensearch.index.Index;
1517
import org.opensearch.index.IndexModule;
18+
import org.opensearch.indices.IndicesService;
1619
import org.opensearch.indices.SystemIndexDescriptor;
1720
import org.opensearch.indices.replication.common.ReplicationType;
1821
import org.opensearch.plugins.Plugin;
@@ -24,8 +27,8 @@
2427
import java.util.Collections;
2528
import java.util.Arrays;
2629

30+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
2731
import static org.opensearch.indices.IndicesService.CLUSTER_SETTING_REPLICATION_TYPE;
28-
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
2932

3033
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
3134
public class SegmentReplicationClusterSettingIT extends OpenSearchIntegTestCase {
@@ -77,71 +80,119 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
7780
return Arrays.asList(SegmentReplicationClusterSettingIT.TestPlugin.class, MockTransportService.TestPlugin.class);
7881
}
7982

80-
public void testReplicationWithSegmentReplicationClusterSetting() throws Exception {
81-
82-
boolean isSystemIndex = randomBoolean();
83-
String indexName = isSystemIndex ? SYSTEM_INDEX_NAME : INDEX_NAME;
83+
public void testSystemIndexWithSegmentReplicationClusterSetting() throws Exception {
8484

8585
// Starting two nodes with primary and replica shards respectively.
8686
final String primaryNode = internalCluster().startNode();
87-
createIndex(indexName);
88-
ensureYellowAndNoInitializingShards(indexName);
87+
createIndex(SYSTEM_INDEX_NAME);
88+
ensureYellowAndNoInitializingShards(SYSTEM_INDEX_NAME);
8989
final String replicaNode = internalCluster().startNode();
90-
ensureGreen(indexName);
91-
92-
final int initialDocCount = scaledRandomIntBetween(20, 30);
93-
for (int i = 0; i < initialDocCount; i++) {
94-
client().prepareIndex(indexName).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
95-
}
96-
97-
refresh(indexName);
98-
assertBusy(() -> {
99-
assertHitCount(client(replicaNode).prepareSearch(indexName).setSize(0).setPreference("_only_local").get(), initialDocCount);
100-
});
101-
102-
SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin()
90+
ensureGreen(SYSTEM_INDEX_NAME);
91+
final GetSettingsResponse response = client().admin()
10392
.indices()
104-
.prepareSegmentReplicationStats(indexName)
105-
.execute()
93+
.getSettings(new GetSettingsRequest().indices(SYSTEM_INDEX_NAME).includeDefaults(true))
10694
.actionGet();
107-
if (isSystemIndex) {
108-
// Verify that Segment Replication did not happen on the replica shard.
109-
assertNull(segmentReplicationStatsResponse.getReplicationStats().get(indexName));
110-
} else {
111-
// Verify that Segment Replication happened on the replica shard.
112-
assertFalse(segmentReplicationStatsResponse.getReplicationStats().get(indexName).get(0).getReplicaStats().isEmpty());
113-
}
95+
assertEquals(response.getSetting(SYSTEM_INDEX_NAME, SETTING_REPLICATION_TYPE), ReplicationType.DOCUMENT.toString());
96+
97+
// Verify index setting isSegRepEnabled is false.
98+
Index index = resolveIndex(SYSTEM_INDEX_NAME);
99+
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNode);
100+
assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), false);
114101
}
115102

116-
public void testIndexReplicationSettingOverridesClusterSetting() throws Exception {
103+
public void testIndexReplicationSettingOverridesSegRepClusterSetting() throws Exception {
104+
Settings settings = Settings.builder().put(CLUSTER_SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build();
105+
final String ANOTHER_INDEX = "test-index";
106+
117107
// Starting two nodes with primary and replica shards respectively.
118-
final String primaryNode = internalCluster().startNode();
108+
final String primaryNode = internalCluster().startNode(settings);
119109
prepareCreate(
120110
INDEX_NAME,
121111
Settings.builder()
122112
// we want to override cluster replication setting by passing a index replication setting
123113
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)
124114
).get();
125-
ensureYellowAndNoInitializingShards(INDEX_NAME);
126-
final String replicaNode = internalCluster().startNode();
127-
ensureGreen(INDEX_NAME);
115+
createIndex(ANOTHER_INDEX);
116+
ensureYellowAndNoInitializingShards(INDEX_NAME, ANOTHER_INDEX);
117+
final String replicaNode = internalCluster().startNode(settings);
118+
119+
// Randomly close and open index.
120+
if (randomBoolean()) {
121+
logger.info("--> Closing the index ");
122+
client().admin().indices().prepareClose(INDEX_NAME).get();
128123

129-
final int initialDocCount = scaledRandomIntBetween(20, 30);
130-
for (int i = 0; i < initialDocCount; i++) {
131-
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
124+
logger.info("--> Opening the index");
125+
client().admin().indices().prepareOpen(INDEX_NAME).get();
132126
}
127+
ensureGreen(INDEX_NAME, ANOTHER_INDEX);
133128

134-
refresh(INDEX_NAME);
135-
assertBusy(() -> {
136-
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
137-
});
129+
final GetSettingsResponse response = client().admin()
130+
.indices()
131+
.getSettings(new GetSettingsRequest().indices(INDEX_NAME, ANOTHER_INDEX).includeDefaults(true))
132+
.actionGet();
133+
assertEquals(response.getSetting(INDEX_NAME, SETTING_REPLICATION_TYPE), ReplicationType.DOCUMENT.toString());
134+
assertEquals(response.getSetting(ANOTHER_INDEX, SETTING_REPLICATION_TYPE), ReplicationType.SEGMENT.toString());
135+
136+
// Verify index setting isSegRepEnabled.
137+
Index index = resolveIndex(INDEX_NAME);
138+
Index anotherIndex = resolveIndex(ANOTHER_INDEX);
139+
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNode);
140+
assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), false);
141+
assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabled(), true);
142+
}
138143

139-
SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin()
144+
public void testIndexReplicationSettingOverridesDocRepClusterSetting() throws Exception {
145+
Settings settings = Settings.builder().put(CLUSTER_SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build();
146+
final String ANOTHER_INDEX = "test-index";
147+
final String primaryNode = internalCluster().startNode(settings);
148+
prepareCreate(
149+
INDEX_NAME,
150+
Settings.builder()
151+
// we want to override cluster replication setting by passing a index replication setting
152+
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
153+
).get();
154+
createIndex(ANOTHER_INDEX);
155+
ensureYellowAndNoInitializingShards(INDEX_NAME, ANOTHER_INDEX);
156+
final String replicaNode = internalCluster().startNode(settings);
157+
ensureGreen(INDEX_NAME, ANOTHER_INDEX);
158+
159+
final GetSettingsResponse response = client().admin()
140160
.indices()
141-
.prepareSegmentReplicationStats(INDEX_NAME)
142-
.execute()
161+
.getSettings(new GetSettingsRequest().indices(INDEX_NAME, ANOTHER_INDEX).includeDefaults(true))
143162
.actionGet();
144-
// Verify that Segment Replication did not happen on the replica shard.
145-
assertNull(segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME));
163+
assertEquals(response.getSetting(INDEX_NAME, SETTING_REPLICATION_TYPE), ReplicationType.SEGMENT.toString());
164+
assertEquals(response.getSetting(ANOTHER_INDEX, SETTING_REPLICATION_TYPE), ReplicationType.DOCUMENT.toString());
165+
166+
// Verify index setting isSegRepEnabled.
167+
Index index = resolveIndex(INDEX_NAME);
168+
Index anotherIndex = resolveIndex(ANOTHER_INDEX);
169+
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNode);
170+
assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), true);
171+
assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabled(), false);
146172
}
173+
174+
public void testHiddenIndicesWithReplicationStrategyClusterSetting() throws Exception {
175+
final String primaryNode = internalCluster().startNode();
176+
final String replicaNode = internalCluster().startNode();
177+
prepareCreate(
178+
INDEX_NAME,
179+
Settings.builder()
180+
// we want to set index as hidden
181+
.put("index.hidden", true)
182+
).get();
183+
ensureGreen(INDEX_NAME);
184+
185+
// Verify that document replication strategy is used for hidden indices.
186+
final GetSettingsResponse response = client().admin()
187+
.indices()
188+
.getSettings(new GetSettingsRequest().indices(INDEX_NAME).includeDefaults(true))
189+
.actionGet();
190+
assertEquals(response.getSetting(INDEX_NAME, SETTING_REPLICATION_TYPE), ReplicationType.DOCUMENT.toString());
191+
192+
// Verify index setting isSegRepEnabled.
193+
Index index = resolveIndex(INDEX_NAME);
194+
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNode);
195+
assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), false);
196+
}
197+
147198
}

server/src/internalClusterTest/java/org/opensearch/remotestore/CreateRemoteIndexIT.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING;
3232
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_TRANSLOG_STORE_ENABLED_SETTING;
3333
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
34+
import static org.opensearch.indices.IndicesService.CLUSTER_SETTING_REPLICATION_TYPE;
3435
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
3536

3637
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
@@ -120,7 +121,15 @@ public void testRemoteStoreDisabledByUser() throws Exception {
120121
.getIndex(new GetIndexRequest().indices("test-idx-1").includeDefaults(true))
121122
.get();
122123
Settings indexSettings = getIndexResponse.settings().get("test-idx-1");
123-
verifyRemoteStoreIndexSettings(indexSettings, "false", null, null, null, null, null);
124+
verifyRemoteStoreIndexSettings(
125+
indexSettings,
126+
"false",
127+
null,
128+
null,
129+
null,
130+
client().settings().get(CLUSTER_SETTING_REPLICATION_TYPE),
131+
null
132+
);
124133
}
125134

126135
public void testRemoteStoreEnabledByUserWithoutRemoteRepoAndSegmentReplicationIllegalArgumentException() throws Exception {

server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717
import org.opensearch.action.search.SearchResponse;
1818
import org.opensearch.cluster.metadata.IndexMetadata;
1919
import org.opensearch.common.settings.Settings;
20+
import org.opensearch.common.util.FeatureFlags;
21+
import org.opensearch.index.Index;
2022
import org.opensearch.index.query.QueryBuilders;
23+
import org.opensearch.indices.IndicesService;
2124
import org.opensearch.indices.replication.common.ReplicationType;
2225
import org.opensearch.rest.RestStatus;
2326
import org.opensearch.test.InternalTestCluster;
@@ -28,6 +31,8 @@
2831
import java.util.List;
2932
import java.util.concurrent.TimeUnit;
3033

34+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
35+
import static org.opensearch.indices.IndicesService.CLUSTER_SETTING_REPLICATION_TYPE;
3136
import static org.opensearch.indices.replication.SegmentReplicationBaseIT.waitForSearchableDocs;
3237
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
3338
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
@@ -261,4 +266,45 @@ public void testRestoreOnReplicaNode() throws Exception {
261266
SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get();
262267
assertHitCount(resp, DOC_COUNT);
263268
}
269+
270+
public void testSnapshotRestoreOnIndexWithSegRepClusterSetting() throws Exception {
271+
Settings settings = Settings.builder()
272+
.put(super.featureFlagSettings())
273+
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
274+
.put(CLUSTER_SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
275+
.build();
276+
277+
// Starting two nodes with primary and replica shards respectively.
278+
final String primaryNode = internalCluster().startNode(settings);
279+
prepareCreate(
280+
INDEX_NAME,
281+
Settings.builder()
282+
// we want to override cluster replication setting by passing a index replication setting
283+
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)
284+
).get();
285+
ensureYellowAndNoInitializingShards(INDEX_NAME);
286+
final String replicaNode = internalCluster().startNode(settings);
287+
ensureGreen(INDEX_NAME);
288+
289+
createSnapshot();
290+
// Delete index
291+
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
292+
assertFalse("index [" + INDEX_NAME + "] should have been deleted", indexExists(INDEX_NAME));
293+
294+
RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null);
295+
296+
// Assertions
297+
assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED);
298+
ensureGreen(RESTORED_INDEX_NAME);
299+
GetSettingsResponse settingsResponse = client().admin()
300+
.indices()
301+
.getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME).includeDefaults(true))
302+
.get();
303+
assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, SETTING_REPLICATION_TYPE), ReplicationType.DOCUMENT.toString());
304+
305+
// Verify index setting isSegRepEnabled.
306+
Index index = resolveIndex(RESTORED_INDEX_NAME);
307+
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNode);
308+
assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), false);
309+
}
264310
}

0 commit comments

Comments
 (0)