Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Wrap checked exceptions in painless.DefBootstrap to support JDK-25 ([#19706](https://github.com/opensearch-project/OpenSearch/pull/19706))
- Refactor the ThreadPoolStats.Stats class to use the Builder pattern instead of constructors ([#19317](https://github.com/opensearch-project/OpenSearch/pull/19317))
- Refactor the IndexingStats.Stats class to use the Builder pattern instead of constructors ([#19306](https://github.com/opensearch-project/OpenSearch/pull/19306))

- Remove FeatureFlag.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG. ([#19715](https://github.com/opensearch-project/OpenSearch/pull/19715))
-
### Fixed
- Fix Allocation and Rebalance Constraints of WeightFunction are incorrectly reset ([#19012](https://github.com/opensearch-project/OpenSearch/pull/19012))
- Fix flaky test FieldDataLoadingIT.testIndicesFieldDataCacheSizeSetting ([#19571](https://github.com/opensearch-project/OpenSearch/pull/19571))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.opensearch.action.support.WriteRequest;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.set.Sets;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.TieredMergePolicyProvider;
Expand Down Expand Up @@ -55,13 +54,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
.build();
}

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
featureSettings.put(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG, true);
return featureSettings.build();
}

public void testPrimaryNodeRestart() throws Exception {
logger.info("--> start nodes");
internalCluster().startNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.support.replication.TransportReplicationAction;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.TieredMergePolicyProvider;
import org.opensearch.indices.recovery.RecoverySettings;
Expand Down Expand Up @@ -48,13 +47,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
.build();
}

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
featureSettings.put(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG, true);
return featureSettings.build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.merge.MergeStats;
import org.opensearch.index.merge.MergedSegmentWarmerStats;
Expand Down Expand Up @@ -65,13 +64,6 @@ public Settings indexSettings() {
.build();
}

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
featureSettings.put(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG, true);
return featureSettings.build();
}

private void setup() {
internalCluster().startNodes(2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ protected FeatureFlagSettings(
FeatureFlags.APPLICATION_BASED_CONFIGURATION_TEMPLATES_SETTING,
FeatureFlags.TERM_VERSION_PRECOMMIT_ENABLE_SETTING,
FeatureFlags.ARROW_STREAMS_SETTING,
FeatureFlags.STREAM_TRANSPORT_SETTING,
FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING
FeatureFlags.STREAM_TRANSPORT_SETTING
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,6 @@ public class FeatureFlags {
*/
public static final String BACKGROUND_TASK_EXECUTION_EXPERIMENTAL = FEATURE_FLAG_PREFIX + "task.background.enabled";

/**
* Gates the functionality of merged segment warmer in local/remote segment replication.
* Once the feature is ready for release, this feature flag can be removed.
*/
public static final String MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG = "opensearch.experimental.feature.merged_segment_warmer.enabled";

public static final Setting<Boolean> REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING = Setting.boolSetting(
REMOTE_STORE_MIGRATION_EXPERIMENTAL,
false,
Expand All @@ -91,12 +85,6 @@ public class FeatureFlags {
Property.NodeScope
);

public static final Setting<Boolean> MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING = Setting.boolSetting(
MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG,
false,
Property.NodeScope
);

/**
* Gates the functionality of application based configuration templates.
*/
Expand Down Expand Up @@ -145,7 +133,6 @@ static class FeatureFlagsImpl {
put(TERM_VERSION_PRECOMMIT_ENABLE_SETTING, TERM_VERSION_PRECOMMIT_ENABLE_SETTING.getDefault(Settings.EMPTY));
put(ARROW_STREAMS_SETTING, ARROW_STREAMS_SETTING.getDefault(Settings.EMPTY));
put(STREAM_TRANSPORT_SETTING, STREAM_TRANSPORT_SETTING.getDefault(Settings.EMPTY));
put(MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING, MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING.getDefault(Settings.EMPTY));
}
};

Expand Down
9 changes: 3 additions & 6 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,8 @@ public IndexService(
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
}
this.asyncReplicationTask = new AsyncReplicationTask(this);
if (FeatureFlags.isEnabled(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING)) {
this.asyncPublishReferencedSegmentsTask = new AsyncPublishReferencedSegmentsTask(this);
}
this.asyncPublishReferencedSegmentsTask = new AsyncPublishReferencedSegmentsTask(this);

this.translogFactorySupplier = translogFactorySupplier;
this.recoverySettings = recoverySettings;
this.remoteStoreSettings = remoteStoreSettings;
Expand Down Expand Up @@ -1210,9 +1209,7 @@ public synchronized void updateMetadata(final IndexMetadata currentIndexMetadata
onRefreshIntervalChange();
updateFsyncTaskIfNecessary();
updateReplicationTask();
if (FeatureFlags.isEnabled(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING)) {
updatePublishReferencedSegmentsTask();
}
updatePublishReferencedSegmentsTask();
}

metadataListeners.forEach(c -> c.accept(newIndexMetadata));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@
import org.opensearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.KeyedLock;
import org.opensearch.common.util.concurrent.ReleasableLock;
Expand Down Expand Up @@ -2398,8 +2397,9 @@ private IndexWriterConfig getIndexWriterConfig() {
if (config().getLeafSorter() != null) {
iwc.setLeafSorter(config().getLeafSorter()); // The default segment search order
}
if (FeatureFlags.isEnabled(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING)
&& config().getIndexSettings().isSegRepEnabledOrRemoteNode()) {
IndexSettings indexSettings = config().getIndexSettings();
if (indexSettings.isDocumentReplication() == false
&& (indexSettings.isSegRepLocalEnabled() || indexSettings.isRemoteStoreEnabled())) {
assert null != config().getIndexReaderWarmer();
iwc.setMergedSegmentWarmer(config().getIndexReaderWarmer());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentReader;
import org.opensearch.Version;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.logging.Loggers;
import org.opensearch.index.merge.MergedSegmentTransferTracker;
Expand Down Expand Up @@ -113,6 +114,12 @@ SegmentCommitInfo segmentCommitInfo(LeafReader leafReader) {

// package-private for tests
boolean shouldWarm(SegmentCommitInfo segmentCommitInfo) throws IOException {
// Min node version check ensures that we only warm, when all nodes expect it
Version minNodeVersion = clusterService.state().nodes().getMinNodeVersion();
if (Version.V_3_4_0.compareTo(minNodeVersion) > 0) {
return false;
}

if (indexShard.getRecoverySettings().isMergedSegmentReplicationWarmerEnabled() == false) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.lucene.index.IndexWriter;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.transport.TransportService;
Expand All @@ -35,12 +34,10 @@ public MergedSegmentWarmerFactory(TransportService transportService, RecoverySet
}

public IndexWriter.IndexReaderWarmer get(IndexShard shard) {
if (FeatureFlags.isEnabled(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG) == false
|| shard.indexSettings().isDocumentReplication()) {
if (shard.indexSettings().isDocumentReplication()) {
// MergedSegmentWarmerFactory#get is called by IndexShard#newEngineConfig on the initialization of a new indexShard and
// in cases of updates to shard state.
// 1. IndexWriter.IndexReaderWarmer should be null when IndexMetadata.INDEX_REPLICATION_TYPE_SETTING == ReplicationType.DOCUMENT
// 2. IndexWriter.IndexReaderWarmer should be null when the FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG == false
// in case of updates to shard state.
// - IndexWriter.IndexReaderWarmer should be null when IndexMetadata.INDEX_REPLICATION_TYPE_SETTING == ReplicationType.DOCUMENT
return null;
} else if (shard.indexSettings().isSegRepLocalEnabled() || shard.indexSettings().isRemoteStoreEnabled()) {
return new MergedSegmentWarmer(transportService, recoverySettings, clusterService, shard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
Expand Down Expand Up @@ -93,13 +92,6 @@ public class RecoverySettings {
public static final Setting<Boolean> INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING = Setting.boolSetting(
"indices.replication.merges.warmer.enabled",
false,
value -> {
if (FeatureFlags.isEnabled(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG) == false && value == true) {
throw new IllegalArgumentException(
"FeatureFlag " + FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG + " must be enabled to set this property to true."
);
}
},
Property.Dynamic,
Property.NodeScope
);
Expand Down
12 changes: 3 additions & 9 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1682,16 +1682,10 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
b.bind(MergedSegmentWarmerFactory.class).toInstance(mergedSegmentWarmerFactory);
b.bind(MappingTransformerRegistry.class).toInstance(mappingTransformerRegistry);
b.bind(AutoForceMergeManager.class).toInstance(autoForceMergeManager);
if (FeatureFlags.isEnabled(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG)) {
if (isRemoteDataAttributePresent(settings)) {
b.bind(MergedSegmentPublisher.PublishAction.class)
.to(RemoteStorePublishMergedSegmentAction.class)
.asEagerSingleton();
} else {
b.bind(MergedSegmentPublisher.PublishAction.class).to(PublishMergedSegmentAction.class).asEagerSingleton();
}
if (isRemoteDataAttributePresent(settings)) {
b.bind(MergedSegmentPublisher.PublishAction.class).to(RemoteStorePublishMergedSegmentAction.class).asEagerSingleton();
} else {
b.bind(MergedSegmentPublisher.PublishAction.class).toInstance((shard, checkpoint) -> {});
b.bind(MergedSegmentPublisher.PublishAction.class).to(PublishMergedSegmentAction.class).asEagerSingleton();
}
b.bind(MergedSegmentPublisher.class).asEagerSingleton();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.opensearch.common.util.FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG;
import static org.opensearch.index.shard.IndexShardTestCase.getEngine;
import static org.opensearch.test.InternalSettingsPlugin.TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -619,7 +618,6 @@ public void testReplicationTask() throws Exception {
assertEquals(1000, updatedTask.getInterval().millis());
}

@LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG)
public void testPublishReferencedSegmentsTask() throws Exception {
// create with docrep - task should not schedule
IndexService indexService = createIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ public void tearDown() throws Exception {
super.tearDown();
}

public void testGetWithSegmentReplicationAndExperimentalFeatureFlagEnabled() {
FeatureFlags.initializeFeatureFlags(Settings.builder().put(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG, true).build());
public void testGetWithSegmentReplication() {
IndexSettings indexSettings = createIndexSettings(
false,
Settings.builder().put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build()
Expand All @@ -67,19 +66,7 @@ public void testGetWithSegmentReplicationAndExperimentalFeatureFlagEnabled() {
assertTrue(warmer instanceof MergedSegmentWarmer);
}

public void testGetWithSegmentReplicationAndExperimentalFeatureFlagDisabled() {
IndexSettings indexSettings = createIndexSettings(
false,
Settings.builder().put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build()
);
when(indexShard.indexSettings()).thenReturn(indexSettings);
IndexWriter.IndexReaderWarmer warmer = factory.get(indexShard);

assertNull(warmer);
}

public void testGetWithRemoteStoreEnabledAndExperimentalFeatureFlagEnabled() {
FeatureFlags.initializeFeatureFlags(Settings.builder().put(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG, true).build());
public void testGetWithRemoteStoreEnabled() {
IndexSettings indexSettings = createIndexSettings(
true,
Settings.builder().put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build()
Expand All @@ -92,40 +79,14 @@ public void testGetWithRemoteStoreEnabledAndExperimentalFeatureFlagEnabled() {
assertTrue(warmer instanceof MergedSegmentWarmer);
}

public void testGetWithRemoteStoreEnabledAndExperimentalFeatureFlagDisabled() {
IndexSettings indexSettings = createIndexSettings(
true,
Settings.builder().put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build()
);
when(indexShard.indexSettings()).thenReturn(indexSettings);

IndexWriter.IndexReaderWarmer warmer = factory.get(indexShard);

assertNull(warmer);
}

public void testGetWithDocumentReplicationAndExperimentalFeatureFlagEnabled() {
FeatureFlags.initializeFeatureFlags(Settings.builder().put(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG, true).build());
IndexSettings indexSettings = createIndexSettings(
false,
Settings.builder().put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT).build()
);

when(indexShard.indexSettings()).thenReturn(indexSettings);
IndexWriter.IndexReaderWarmer warmer = factory.get(indexShard);

assertNull(warmer);
}

public void testGetWithDocumentReplicationAndExperimentalFeatureFlagDisabled() {
public void testGetWithDocumentReplication() {
IndexSettings indexSettings = createIndexSettings(
false,
Settings.builder().put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT).build()
);

when(indexShard.indexSettings()).thenReturn(indexSettings);
IndexWriter.IndexReaderWarmer warmer = factory.get(indexShard);

assertNull(warmer);
}

Expand Down
Loading
Loading