Skip to content

Commit

Permalink
[Segment Replication] Remove Segment Replication REPLICATION_TYPE fea…
Browse files Browse the repository at this point in the history
…ture flag. (opensearch-project#7045)

* Remove segment replication REPLICATION_TYPE feature flag.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Removing FeatureFlags.REPLICATION_TYPE uses.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Add change log entry.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Fix failing WaitUntilRefreshIT.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Modify changelog.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

---------

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
Signed-off-by: Rishikesh Pasham <62345295+Rishikesh1159@users.noreply.github.com>
  • Loading branch information
Rishikesh1159 authored Apr 17, 2023
1 parent 2f5bf5b commit e7cdca1
Show file tree
Hide file tree
Showing 15 changed files with 23 additions and 88 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Segment Replication] Add point in time and scroll query compatibility. ([#6644](https://github.com/opensearch-project/OpenSearch/pull/6644))
- Add retry delay as dynamic setting for cluster maanger throttling. ([#6998](https://github.com/opensearch-project/OpenSearch/pull/6998))
- Introduce full support for searchable snapshots ([#5087](https://github.com/opensearch-project/OpenSearch/issues/5087))
- Introduce full support for Segment Replication ([#5147](https://github.com/opensearch-project/OpenSearch/issues/5147))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@
import org.opensearch.action.support.WriteRequest.RefreshPolicy;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.rest.RestStatus;
import org.opensearch.script.MockScriptPlugin;
Expand Down Expand Up @@ -77,11 +74,7 @@ public class WaitUntilRefreshIT extends OpenSearchIntegTestCase {
@Override
public Settings indexSettings() {
// Use a shorter refresh interval to speed up the tests. We'll be waiting on this interval several times.
final Settings.Builder builder = Settings.builder().put(super.indexSettings()).put("index.refresh_interval", "40ms");
if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) {
builder.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);
}
return builder.build();
return Settings.builder().put(super.indexSettings()).put("index.refresh_interval", "40ms").build();
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
Expand Down Expand Up @@ -54,11 +53,6 @@ public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase {
protected static final int SHARD_COUNT = 1;
protected static final int REPLICA_COUNT = 1;

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build();
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return asList(MockTransportService.TestPlugin.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,7 @@ public Settings indexSettings() {

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.put(FeatureFlags.REPLICATION_TYPE, "true")
.build();
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true").build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,13 @@ protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.put(FeatureFlags.REPLICATION_TYPE, "true")
.put(FeatureFlags.REMOTE_STORE, "true")
.build();
}

@Before
public void setup() {
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
FeatureFlagSetter.set(FeatureFlags.REPLICATION_TYPE);
internalCluster().startClusterManagerOnlyNode();
assertAcked(
clusterAdmin().preparePutRepository("my-segment-repo-1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,7 @@ protected boolean addMockInternalEngine() {

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.REPLICATION_TYPE, "true")
.put(FeatureFlags.REMOTE_STORE, "true")
.build();
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build();
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.rest.RestStatus;
Expand All @@ -44,11 +43,6 @@ public class SegmentReplicationSnapshotIT extends AbstractSnapshotIntegTestCase
private static final String REPOSITORY_NAME = "test-segrep-repo";
private static final String SNAPSHOT_NAME = "test-segrep-snapshot";

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build();
}

public Settings segRepEnableIndexSettings() {
return getShardSettings().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ public void apply(Settings value, Settings current, Settings previous) {
public static final Map<List<String>, List<Setting>> FEATURE_FLAGGED_CLUSTER_SETTINGS = Map.of(
List.of(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL),
List.of(IndicesService.CLUSTER_REPLICATION_TYPE_SETTING),
List.of(FeatureFlags.REMOTE_STORE, FeatureFlags.REPLICATION_TYPE),
List.of(FeatureFlags.REMOTE_STORE),
List.of(
IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING,
IndicesService.CLUSTER_REMOTE_STORE_REPOSITORY_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ protected FeatureFlagSettings(
public static final Set<Setting<?>> BUILT_IN_FEATURE_FLAGS = Collections.unmodifiableSet(
new HashSet<>(
Arrays.asList(
FeatureFlags.REPLICATION_TYPE_SETTING,
FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL_SETTING,
FeatureFlags.REMOTE_STORE_SETTING,
FeatureFlags.EXTENSIONS_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetadata.INDEX_DATA_PATH_SETTING,
IndexMetadata.INDEX_FORMAT_SETTING,
IndexMetadata.INDEX_HIDDEN_SETTING,
IndexMetadata.INDEX_REPLICATION_TYPE_SETTING,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG_SETTING,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_WARN_SETTING,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_INFO_SETTING,
Expand Down Expand Up @@ -226,8 +227,6 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
* setting should be moved to {@link #BUILT_IN_INDEX_SETTINGS}.
*/
public static final Map<String, List<Setting>> FEATURE_FLAGGED_INDEX_SETTINGS = Map.of(
FeatureFlags.REPLICATION_TYPE,
List.of(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING),
FeatureFlags.REMOTE_STORE,
List.of(
IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,6 @@
*/
public class FeatureFlags {

/**
* Gates the visibility of the index setting that allows changing of replication type.
* Once the feature is ready for production release, this feature flag can be removed.
*/
public static final String REPLICATION_TYPE = "opensearch.experimental.feature.replication_type.enabled";

/**
* Gates the visibility of the segment replication experimental features that allows users to test unreleased beta features.
*/
Expand Down Expand Up @@ -86,8 +80,6 @@ public static boolean isEnabled(String featureFlagName) {
return settings != null && settings.getAsBoolean(featureFlagName, false);
}

public static final Setting<Boolean> REPLICATION_TYPE_SETTING = Setting.boolSetting(REPLICATION_TYPE, false, Property.NodeScope);

public static final Setting<Boolean> SEGMENT_REPLICATION_EXPERIMENTAL_SETTING = Setting.boolSetting(
SEGMENT_REPLICATION_EXPERIMENTAL,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.opensearch.common.inject.AbstractModule;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.index.mapper.BinaryFieldMapper;
import org.opensearch.index.mapper.BooleanFieldMapper;
Expand Down Expand Up @@ -284,11 +283,7 @@ protected void configure() {
bind(RetentionLeaseSyncAction.class).asEagerSingleton();
bind(RetentionLeaseBackgroundSyncAction.class).asEagerSingleton();
bind(RetentionLeaseSyncer.class).asEagerSingleton();
if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) {
bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton();
} else {
bind(SegmentReplicationCheckpointPublisher.class).toInstance(SegmentReplicationCheckpointPublisher.EMPTY);
}
bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.env.ShardLockObtainFailedException;
Expand Down Expand Up @@ -214,11 +213,8 @@ public IndicesClusterStateService(
final List<IndexEventListener> indexEventListeners = new ArrayList<>(
Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, snapshotShardsService)
);
// if segrep feature flag is not enabled, don't wire the target serivce as an IndexEventListener.
if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) {
indexEventListeners.add(segmentReplicationTargetService);
indexEventListeners.add(segmentReplicationSourceService);
}
indexEventListeners.add(segmentReplicationTargetService);
indexEventListeners.add(segmentReplicationSourceService);
this.segmentReplicationTargetService = segmentReplicationTargetService;
this.builtInIndexListener = Collections.unmodifiableList(indexEventListeners);
this.indicesService = indicesService;
Expand Down
38 changes: 14 additions & 24 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@
import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;
import static org.opensearch.common.util.FeatureFlags.REPLICATION_TYPE;
import static org.opensearch.common.util.FeatureFlags.SEARCH_PIPELINE;
import static org.opensearch.env.NodeEnvironment.collectFileCacheDataPath;
import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY;
Expand Down Expand Up @@ -1095,23 +1094,18 @@ protected Node(
.toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings));
b.bind(PeerRecoveryTargetService.class)
.toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService));
if (FeatureFlags.isEnabled(REPLICATION_TYPE)) {
b.bind(SegmentReplicationTargetService.class)
.toInstance(
new SegmentReplicationTargetService(
threadPool,
recoverySettings,
transportService,
new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService),
indicesService
)
);
b.bind(SegmentReplicationSourceService.class)
.toInstance(new SegmentReplicationSourceService(indicesService, transportService, recoverySettings));
} else {
b.bind(SegmentReplicationTargetService.class).toInstance(SegmentReplicationTargetService.NO_OP);
b.bind(SegmentReplicationSourceService.class).toInstance(SegmentReplicationSourceService.NO_OP);
}
b.bind(SegmentReplicationTargetService.class)
.toInstance(
new SegmentReplicationTargetService(
threadPool,
recoverySettings,
transportService,
new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService),
indicesService
)
);
b.bind(SegmentReplicationSourceService.class)
.toInstance(new SegmentReplicationSourceService(indicesService, transportService, recoverySettings));
}
b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
Expand Down Expand Up @@ -1262,9 +1256,7 @@ public Node start() throws NodeValidationException {
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
: "transportService has a different local node than the factory provided";
injector.getInstance(PeerRecoverySourceService.class).start();
if (FeatureFlags.isEnabled(REPLICATION_TYPE)) {
injector.getInstance(SegmentReplicationSourceService.class).start();
}
injector.getInstance(SegmentReplicationSourceService.class).start();

// Load (and maybe upgrade) the metadata stored on disk
final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
Expand Down Expand Up @@ -1444,9 +1436,7 @@ public synchronized void close() throws IOException {
// close filter/fielddata caches after indices
toClose.add(injector.getInstance(IndicesStore.class));
toClose.add(injector.getInstance(PeerRecoverySourceService.class));
if (FeatureFlags.isEnabled(REPLICATION_TYPE)) {
toClose.add(injector.getInstance(SegmentReplicationSourceService.class));
}
toClose.add(injector.getInstance(SegmentReplicationSourceService.class));
toClose.add(() -> stopWatch.stop().start("cluster"));
toClose.add(injector.getInstance(ClusterService.class));
toClose.add(() -> stopWatch.stop().start("node_connections_service"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1143,7 +1143,6 @@ public void testRemoteStoreNoUserOverrideConflictingReplicationTypeIndexSettings
.put(CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING.getKey(), "my-translog-repo-1")
.build();
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
FeatureFlagSetter.set(FeatureFlags.REPLICATION_TYPE);

request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
IllegalArgumentException exc = expectThrows(
Expand Down Expand Up @@ -1174,7 +1173,6 @@ public void testRemoteStoreNoUserOverrideExceptReplicationTypeSegmentIndexSettin
.put(CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING.getKey(), "my-translog-repo-1")
.build();
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
FeatureFlagSetter.set(FeatureFlags.REPLICATION_TYPE);

request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
final Settings.Builder requestSettings = Settings.builder();
Expand Down Expand Up @@ -1210,7 +1208,6 @@ public void testRemoteStoreNoUserOverrideIndexSettings() {
.put(CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING.getKey(), "my-translog-repo-1")
.build();
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
FeatureFlagSetter.set(FeatureFlags.REPLICATION_TYPE);

request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
Settings indexSettings = aggregateIndexSettings(
Expand Down Expand Up @@ -1242,7 +1239,6 @@ public void testRemoteStoreDisabledByUserIndexSettings() {
.put(CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING.getKey(), "my-translog-repo-1")
.build();
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
FeatureFlagSetter.set(FeatureFlags.REPLICATION_TYPE);

request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
final Settings.Builder requestSettings = Settings.builder();
Expand All @@ -1269,7 +1265,6 @@ public void testRemoteStoreTranslogDisabledByUserIndexSettings() {
.put(CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING.getKey(), "my-translog-repo-1")
.build();
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
FeatureFlagSetter.set(FeatureFlags.REPLICATION_TYPE);

request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
final Settings.Builder requestSettings = Settings.builder();
Expand Down Expand Up @@ -1297,7 +1292,6 @@ public void testRemoteStoreOverrideSegmentRepoIndexSettings() {
.put(CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING.getKey(), "my-translog-repo-1")
.build();
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
FeatureFlagSetter.set(FeatureFlags.REPLICATION_TYPE);

request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
final Settings.Builder requestSettings = Settings.builder();
Expand Down Expand Up @@ -1334,7 +1328,6 @@ public void testRemoteStoreOverrideTranslogRepoIndexSettings() {
.put(CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING.getKey(), "my-translog-repo-1")
.build();
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
FeatureFlagSetter.set(FeatureFlags.REPLICATION_TYPE);

request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
final Settings.Builder requestSettings = Settings.builder();
Expand Down Expand Up @@ -1369,7 +1362,6 @@ public void testRemoteStoreOverrideReplicationTypeIndexSettings() {
.put(CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING.getKey(), "my-translog-repo-1")
.build();
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
FeatureFlagSetter.set(FeatureFlags.REPLICATION_TYPE);

request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
final Settings.Builder requestSettings = Settings.builder();
Expand Down

0 comments on commit e7cdca1

Please sign in to comment.