diff --git a/CHANGELOG.md b/CHANGELOG.md index e1834df696769..7519e242c0acf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Implement on behalf of token passing for extensions ([#8679](https://github.com/opensearch-project/OpenSearch/pull/8679)) - Implement Visitor Design pattern in QueryBuilder to enable the capability to traverse through the complex QueryBuilder tree. ([#10110](https://github.com/opensearch-project/OpenSearch/pull/10110)) - Provide service accounts tokens to extensions ([#9618](https://github.com/opensearch-project/OpenSearch/pull/9618)) +- Configurable merge policy for index with an option to choose from LogByteSize and Tiered merge policy ([#9992](https://github.com/opensearch-project/OpenSearch/pull/9992)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java index 2bab61f3e1c4c..229cd7bffad2f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java @@ -53,7 +53,7 @@ import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; -import org.opensearch.index.MergePolicyConfig; +import org.opensearch.index.MergePolicyProvider; import org.opensearch.index.engine.Engine; import org.opensearch.index.query.QueryBuilders; import org.opensearch.index.shard.ShardPath; @@ -519,7 +519,7 @@ public void testReuseInFileBasedPeerRecovery() throws Exception { .put("number_of_replicas", 1) // disable merges to keep segments the same - .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + .put(MergePolicyProvider.INDEX_MERGE_ENABLED, false) // expire retention leases quickly .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms") diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandIT.java index f8c2acbf99f70..b431079476624 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandIT.java @@ -73,7 +73,7 @@ import org.opensearch.env.TestEnvironment; import org.opensearch.gateway.GatewayMetaState; import org.opensearch.index.IndexSettings; -import org.opensearch.index.MergePolicyConfig; +import org.opensearch.index.MergePolicyProvider; import org.opensearch.index.MockEngineFactoryPlugin; import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.translog.TestTranslog; @@ -135,7 +135,7 @@ public void testCorruptIndex() throws Exception { Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + .put(MergePolicyProvider.INDEX_MERGE_ENABLED, false) .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "-1") .put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true) .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "checksum") diff --git a/server/src/internalClusterTest/java/org/opensearch/index/store/CorruptedFileIT.java b/server/src/internalClusterTest/java/org/opensearch/index/store/CorruptedFileIT.java index 7e1d0792e3ddb..8291fef5d177b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/store/CorruptedFileIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/store/CorruptedFileIT.java @@ -72,7 +72,7 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexSettings; -import org.opensearch.index.MergePolicyConfig; +import org.opensearch.index.MergePolicyProvider; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardState; @@ -167,7 +167,7 @@ public void testCorruptFileAndRecover() throws ExecutionException, InterruptedEx Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1") - .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + .put(MergePolicyProvider.INDEX_MERGE_ENABLED, false) // no checkindex - we corrupt shards on purpose .put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) // no translog based flush - it might change the .liv / segments.N files @@ -286,7 +286,7 @@ public void testCorruptPrimaryNoReplica() throws ExecutionException, Interrupted prepareCreate("test").setSettings( Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0") - .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + .put(MergePolicyProvider.INDEX_MERGE_ENABLED, false) .put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) // no checkindex - we corrupt shards on // purpose // no translog based flush - it might change the .liv / segments.N files @@ -552,7 +552,7 @@ public void testCorruptFileThenSnapshotAndRestore() throws ExecutionException, I prepareCreate("test").setSettings( Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0") // no replicas for this test - .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + .put(MergePolicyProvider.INDEX_MERGE_ENABLED, false) // no checkindex - we corrupt shards on purpose .put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) // no translog based flush - it might change the .liv / segments.N files @@ -624,7 +624,7 @@ public void testReplicaCorruption() throws Exception { prepareCreate("test").setSettings( Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1) - .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + .put(MergePolicyProvider.INDEX_MERGE_ENABLED, false) .put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) // no checkindex - we corrupt shards on // purpose .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.PB)) // no diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java index a0f01acd1f8e9..0967acb37d3e8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java @@ -66,8 +66,8 @@ import org.opensearch.index.IndexModule; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; -import org.opensearch.index.MergePolicyConfig; import org.opensearch.index.MergeSchedulerConfig; +import org.opensearch.index.TieredMergePolicyProvider; import org.opensearch.index.VersionType; import org.opensearch.index.cache.query.QueryCacheStats; import org.opensearch.index.engine.VersionConflictEngineException; @@ -589,8 +589,8 @@ public void testNonThrottleStats() throws Exception { prepareCreate("test").setSettings( settingsBuilder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0") - .put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2") - .put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2") .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1") .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "10000") ) @@ -621,8 +621,8 @@ public void testThrottleStats() throws Exception { prepareCreate("test").setSettings( settingsBuilder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0") - .put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2") - .put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2") .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1") .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "1") .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC.name()) diff --git a/server/src/internalClusterTest/java/org/opensearch/update/UpdateIT.java b/server/src/internalClusterTest/java/org/opensearch/update/UpdateIT.java index 442268d513fc3..b46d27bafb2a5 100644 --- a/server/src/internalClusterTest/java/org/opensearch/update/UpdateIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/update/UpdateIT.java @@ -50,7 +50,7 @@ import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; -import org.opensearch.index.MergePolicyConfig; +import org.opensearch.index.MergePolicyProvider; import org.opensearch.index.engine.DocumentMissingException; import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.plugins.Plugin; @@ -669,7 +669,7 @@ public void run() { public void testStressUpdateDeleteConcurrency() throws Exception { // We create an index with merging disabled so that deletes don't get merged away - assertAcked(prepareCreate("test").setSettings(Settings.builder().put(MergePolicyConfig.INDEX_MERGE_ENABLED, false))); + assertAcked(prepareCreate("test").setSettings(Settings.builder().put(MergePolicyProvider.INDEX_MERGE_ENABLED, false))); ensureGreen(); Script fieldIncScript = new Script(ScriptType.INLINE, UPDATE_SCRIPTS, FIELD_INC_SCRIPT, Collections.singletonMap("field", "field")); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 032027384f106..5261d40387dc6 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -457,6 +457,7 @@ public void apply(Settings value, Settings current, Settings previous) { NetworkService.TCP_CONNECT_TIMEOUT, IndexSettings.QUERY_STRING_ANALYZE_WILDCARD, IndexSettings.QUERY_STRING_ALLOW_LEADING_WILDCARD, + IndexSettings.TIME_SERIES_INDEX_MERGE_POLICY, ScriptService.SCRIPT_GENERAL_CACHE_SIZE_SETTING, ScriptService.SCRIPT_GENERAL_CACHE_EXPIRE_SETTING, ScriptService.SCRIPT_GENERAL_MAX_COMPILATIONS_RATE_SETTING, diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 5b2afc44600bd..83bf8c82ee3dd 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -45,9 +45,11 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.IndexSortConfig; import org.opensearch.index.IndexingSlowLog; -import org.opensearch.index.MergePolicyConfig; +import org.opensearch.index.LogByteSizeMergePolicyProvider; +import org.opensearch.index.MergePolicyProvider; import org.opensearch.index.MergeSchedulerConfig; import org.opensearch.index.SearchSlowLog; +import org.opensearch.index.TieredMergePolicyProvider; import org.opensearch.index.cache.bitset.BitsetFilterCache; import org.opensearch.index.engine.EngineConfig; import org.opensearch.index.fielddata.IndexFieldDataService; @@ -120,14 +122,14 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexingSlowLog.INDEX_INDEXING_SLOWLOG_LEVEL_SETTING, IndexingSlowLog.INDEX_INDEXING_SLOWLOG_REFORMAT_SETTING, IndexingSlowLog.INDEX_INDEXING_SLOWLOG_MAX_SOURCE_CHARS_TO_LOG_SETTING, - MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, - MergePolicyConfig.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING, - MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, - MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING, - MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING, - MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING, - MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING, - MergePolicyConfig.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT_SETTING, + TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING, + TieredMergePolicyProvider.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING, + TieredMergePolicyProvider.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, + TieredMergePolicyProvider.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING, + TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING, + TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING, + TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING, + TieredMergePolicyProvider.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT_SETTING, IndexSortConfig.INDEX_SORT_FIELD_SETTING, IndexSortConfig.INDEX_SORT_ORDER_SETTING, IndexSortConfig.INDEX_SORT_MISSING_SETTING, @@ -202,6 +204,13 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED, IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME, IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY, + IndexSettings.INDEX_MERGE_POLICY, + LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING, + LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MIN_MERGE_SETTING, + LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_SETTING, + LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_FOR_FORCED_MERGE_SETTING, + LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGED_DOCS_SETTING, + LogByteSizeMergePolicyProvider.INDEX_LBS_NO_CFS_RATIO_SETTING, IndexSettings.DEFAULT_SEARCH_PIPELINE, // Settings for Searchable Snapshots @@ -275,7 +284,7 @@ public boolean isPrivateSetting(String key) { case IndexMetadata.SETTING_HISTORY_UUID: case IndexMetadata.SETTING_VERSION_UPGRADED: case IndexMetadata.SETTING_INDEX_PROVIDED_NAME: - case MergePolicyConfig.INDEX_MERGE_ENABLED: + case MergePolicyProvider.INDEX_MERGE_ENABLED: // we keep the shrink settings for BWC - this can be removed in 8.0 // we can't remove in 7 since this setting might be baked into an index coming in via a full cluster restart from 6.0 case "index.shrink.source.uuid": diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 1e4224c314f05..ce6c1a5ad6284 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -54,6 +54,7 @@ import org.opensearch.node.Node; import org.opensearch.search.pipeline.SearchPipelineService; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Locale; @@ -83,9 +84,42 @@ */ @PublicApi(since = "1.0.0") public final class IndexSettings { - private static final String MERGE_ON_FLUSH_DEFAULT_POLICY = "default"; + private static final String DEFAULT_POLICY = "default"; private static final String MERGE_ON_FLUSH_MERGE_POLICY = "merge-on-flush"; + /** + * Enum representing supported merge policies + */ + public enum IndexMergePolicy { + TIERED("tiered"), + LOG_BYTE_SIZE("log_byte_size"), + DEFAULT_POLICY(IndexSettings.DEFAULT_POLICY); + + private final String value; + + IndexMergePolicy(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + public static IndexMergePolicy fromString(String text) { + for (IndexMergePolicy policy : IndexMergePolicy.values()) { + if (policy.value.equals(text)) { + return policy; + } + } + throw new IllegalArgumentException( + "The setting has unsupported policy specified: " + + text + + ". Please use one of: " + + String.join(", ", Arrays.stream(IndexMergePolicy.values()).map(IndexMergePolicy::getValue).toArray(String[]::new)) + ); + } + } + public static final Setting> DEFAULT_FIELD_SETTING = Setting.listSetting( "index.query.default_field", Collections.singletonList("*"), @@ -566,11 +600,25 @@ public final class IndexSettings { public static final Setting INDEX_MERGE_ON_FLUSH_POLICY = Setting.simpleString( "index.merge_on_flush.policy", - MERGE_ON_FLUSH_DEFAULT_POLICY, + DEFAULT_POLICY, Property.IndexScope, Property.Dynamic ); + public static final Setting INDEX_MERGE_POLICY = Setting.simpleString( + "index.merge.policy", + DEFAULT_POLICY, + IndexMergePolicy::fromString, + Property.IndexScope + ); + + public static final Setting TIME_SERIES_INDEX_MERGE_POLICY = Setting.simpleString( + "indices.time_series_index.default_index_merge_policy", + DEFAULT_POLICY, + IndexMergePolicy::fromString, + Property.NodeScope + ); + public static final Setting SEARCHABLE_SNAPSHOT_REPOSITORY = Setting.simpleString( "index.searchable_snapshot.repository", Property.IndexScope, @@ -651,7 +699,8 @@ public final class IndexSettings { private volatile ByteSizeValue generationThresholdSize; private volatile ByteSizeValue flushAfterMergeThresholdSize; private final MergeSchedulerConfig mergeSchedulerConfig; - private final MergePolicyConfig mergePolicyConfig; + private final TieredMergePolicyProvider tieredMergePolicyProvider; + private final LogByteSizeMergePolicyProvider logByteSizeMergePolicyProvider; private final IndexSortConfig indexSortConfig; private final IndexScopedSettings scopedSettings; private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis(); @@ -844,7 +893,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti maxAnalyzedOffset = scopedSettings.get(MAX_ANALYZED_OFFSET_SETTING); maxTermsCount = scopedSettings.get(MAX_TERMS_COUNT_SETTING); maxRegexLength = scopedSettings.get(MAX_REGEX_LENGTH_SETTING); - this.mergePolicyConfig = new MergePolicyConfig(logger, this); + this.tieredMergePolicyProvider = new TieredMergePolicyProvider(logger, this); + this.logByteSizeMergePolicyProvider = new LogByteSizeMergePolicyProvider(logger, this); this.indexSortConfig = new IndexSortConfig(this); searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER); defaultPipeline = scopedSettings.get(DEFAULT_PIPELINE); @@ -866,33 +916,59 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti * Now this sortField (IndexSort) is stored in SegmentInfo and we need to maintain backward compatibility for them. */ widenIndexSortType = IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(settings).before(V_2_7_0); - - scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio); scopedSettings.addSettingsUpdateConsumer( - MergePolicyConfig.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING, - mergePolicyConfig::setDeletesPctAllowed + TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING, + tieredMergePolicyProvider::setNoCFSRatio ); scopedSettings.addSettingsUpdateConsumer( - MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, - mergePolicyConfig::setExpungeDeletesAllowed + TieredMergePolicyProvider.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING, + tieredMergePolicyProvider::setDeletesPctAllowed ); scopedSettings.addSettingsUpdateConsumer( - MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING, - mergePolicyConfig::setFloorSegmentSetting + TieredMergePolicyProvider.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, + tieredMergePolicyProvider::setExpungeDeletesAllowed ); scopedSettings.addSettingsUpdateConsumer( - MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING, - mergePolicyConfig::setMaxMergesAtOnce + TieredMergePolicyProvider.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING, + tieredMergePolicyProvider::setFloorSegmentSetting ); scopedSettings.addSettingsUpdateConsumer( - MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING, - mergePolicyConfig::setMaxMergedSegment + TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING, + tieredMergePolicyProvider::setMaxMergesAtOnce ); scopedSettings.addSettingsUpdateConsumer( - MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING, - mergePolicyConfig::setSegmentsPerTier + TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING, + tieredMergePolicyProvider::setMaxMergedSegment + ); + scopedSettings.addSettingsUpdateConsumer( + TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING, + tieredMergePolicyProvider::setSegmentsPerTier ); + scopedSettings.addSettingsUpdateConsumer( + LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING, + logByteSizeMergePolicyProvider::setLBSMergeFactor + ); + scopedSettings.addSettingsUpdateConsumer( + LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MIN_MERGE_SETTING, + logByteSizeMergePolicyProvider::setLBSMinMergedMB + ); + scopedSettings.addSettingsUpdateConsumer( + LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_SETTING, + logByteSizeMergePolicyProvider::setLBSMaxMergeSegment + ); + scopedSettings.addSettingsUpdateConsumer( + LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_FOR_FORCED_MERGE_SETTING, + logByteSizeMergePolicyProvider::setLBSMaxMergeMBForForcedMerge + ); + scopedSettings.addSettingsUpdateConsumer( + LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGED_DOCS_SETTING, + logByteSizeMergePolicyProvider::setLBSMaxMergeDocs + ); + scopedSettings.addSettingsUpdateConsumer( + LogByteSizeMergePolicyProvider.INDEX_LBS_NO_CFS_RATIO_SETTING, + logByteSizeMergePolicyProvider::setLBSNoCFSRatio + ); scopedSettings.addSettingsUpdateConsumer( MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING, MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING, @@ -1439,9 +1515,43 @@ public long getGcDeletesInMillis() { /** * Returns the merge policy that should be used for this index. - */ - public MergePolicy getMergePolicy() { - return mergePolicyConfig.getMergePolicy(); + * @param isTimeSeriesIndex true if index contains @timestamp field + */ + public MergePolicy getMergePolicy(boolean isTimeSeriesIndex) { + String indexScopedPolicy = scopedSettings.get(INDEX_MERGE_POLICY); + MergePolicyProvider mergePolicyProvider = null; + IndexMergePolicy indexMergePolicy = IndexMergePolicy.fromString(indexScopedPolicy); + switch (indexMergePolicy) { + case TIERED: + mergePolicyProvider = tieredMergePolicyProvider; + break; + case LOG_BYTE_SIZE: + mergePolicyProvider = logByteSizeMergePolicyProvider; + break; + case DEFAULT_POLICY: + if (isTimeSeriesIndex) { + String nodeScopedTimeSeriesIndexPolicy = TIME_SERIES_INDEX_MERGE_POLICY.get(nodeSettings); + IndexMergePolicy nodeMergePolicy = IndexMergePolicy.fromString(nodeScopedTimeSeriesIndexPolicy); + switch (nodeMergePolicy) { + case TIERED: + case DEFAULT_POLICY: + mergePolicyProvider = tieredMergePolicyProvider; + break; + case LOG_BYTE_SIZE: + mergePolicyProvider = logByteSizeMergePolicyProvider; + break; + } + } else { + mergePolicyProvider = tieredMergePolicyProvider; + } + break; + } + assert mergePolicyProvider != null : "should not happen as validation for invalid merge policy values " + + "are part of setting definition"; + if (logger.isTraceEnabled()) { + logger.trace("Index: " + this.index.getName() + ", Merge policy used: " + mergePolicyProvider); + } + return mergePolicyProvider.getMergePolicy(); } public T getValue(Setting setting) { @@ -1632,7 +1742,7 @@ public boolean isMergeOnFlushEnabled() { } private void setMergeOnFlushPolicy(String policy) { - if (Strings.isEmpty(policy) || MERGE_ON_FLUSH_DEFAULT_POLICY.equalsIgnoreCase(policy)) { + if (Strings.isEmpty(policy) || DEFAULT_POLICY.equalsIgnoreCase(policy)) { mergeOnFlushPolicy = null; } else if (MERGE_ON_FLUSH_MERGE_POLICY.equalsIgnoreCase(policy)) { this.mergeOnFlushPolicy = MergeOnFlushMergePolicy::new; @@ -1643,7 +1753,7 @@ private void setMergeOnFlushPolicy(String policy) { + " has unsupported policy specified: " + policy + ". Please use one of: " - + MERGE_ON_FLUSH_DEFAULT_POLICY + + DEFAULT_POLICY + ", " + MERGE_ON_FLUSH_MERGE_POLICY ); diff --git a/server/src/main/java/org/opensearch/index/LogByteSizeMergePolicyProvider.java b/server/src/main/java/org/opensearch/index/LogByteSizeMergePolicyProvider.java new file mode 100644 index 0000000000000..0b762d781957c --- /dev/null +++ b/server/src/main/java/org/opensearch/index/LogByteSizeMergePolicyProvider.java @@ -0,0 +1,166 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.LogByteSizeMergePolicy; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.NoMergePolicy; +import org.opensearch.common.settings.Setting; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; + +import static org.apache.lucene.index.LogMergePolicy.DEFAULT_MAX_MERGE_DOCS; +import static org.apache.lucene.index.LogMergePolicy.DEFAULT_NO_CFS_RATIO; + +/** + *

+ * The LogByteSizeMergePolicy is an alternative merge policy primarily used here to optimize the merging of segments in scenarios + * with index with timestamps. + * While the TieredMergePolicy is the default choice, the LogByteSizeMergePolicy can be configured + * as the default merge policy for time-index data using the index.datastream_merge.policy setting. + * + *

+ * Unlike the TieredMergePolicy, which prioritizes merging segments of equal sizes, the LogByteSizeMergePolicy + * specializes in merging adjacent segments efficiently. + * This characteristic makes it particularly well-suited for range queries on time-index data. + * Typically, adjacent segments in time-index data often contain documents with similar timestamps. + * When these segments are merged, the resulting segment covers a range of timestamps with reduced overlap compared + * to the adjacent segments. This reduced overlap remains even as segments grow older and larger, + * which can significantly benefit range queries on timestamps. + * + *

+ * In contrast, the TieredMergePolicy does not honor this timestamp range optimization. It focuses on merging segments + * of equal sizes and does not consider adjacency. Consequently, as segments grow older and larger, + * the overlap of timestamp ranges among adjacent segments managed by TieredMergePolicy can increase. + * This can lead to inefficiencies in range queries on timestamps, as the number of segments to be scanned + * within a given timestamp range could become high. + * + * @opensearch.internal + */ +public class LogByteSizeMergePolicyProvider implements MergePolicyProvider { + private final LogByteSizeMergePolicy logByteSizeMergePolicy = new LogByteSizeMergePolicy(); + + private final Logger logger; + private final boolean mergesEnabled; + + public static final ByteSizeValue DEFAULT_MIN_MERGE = new ByteSizeValue(2, ByteSizeUnit.MB); + public static final int DEFAULT_MERGE_FACTOR = 10; + + public static final ByteSizeValue DEFAULT_MAX_MERGED_SEGMENT = new ByteSizeValue(5, ByteSizeUnit.GB); + + public static final ByteSizeValue DEFAULT_MAX_MERGE_SEGMENT_FORCE_MERGE = new ByteSizeValue(Long.MAX_VALUE); + + public static final Setting INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING = Setting.intSetting( + "index.merge.log_byte_size_policy.merge_factor", + DEFAULT_MERGE_FACTOR, // keeping it same as default max merge at once for tiered merge policy + 2, + Setting.Property.Dynamic, + Setting.Property.IndexScope + ); + + public static final Setting INDEX_LBS_MERGE_POLICY_MIN_MERGE_SETTING = Setting.byteSizeSetting( + "index.merge.log_byte_size_policy.min_merge", + DEFAULT_MIN_MERGE, // keeping it same as default floor segment for tiered merge policy + Setting.Property.Dynamic, + Setting.Property.IndexScope + ); + + public static final Setting INDEX_LBS_MAX_MERGE_SEGMENT_SETTING = Setting.byteSizeSetting( + "index.merge.log_byte_size_policy.max_merge_segment", + DEFAULT_MAX_MERGED_SEGMENT, // keeping default same as tiered merge policy + Setting.Property.Dynamic, + Setting.Property.IndexScope + ); + + public static final Setting INDEX_LBS_MAX_MERGE_SEGMENT_FOR_FORCED_MERGE_SETTING = Setting.byteSizeSetting( + "index.merge.log_byte_size_policy.max_merge_segment_forced_merge", + DEFAULT_MAX_MERGE_SEGMENT_FORCE_MERGE, + Setting.Property.Dynamic, + Setting.Property.IndexScope + ); + + public static final Setting INDEX_LBS_MAX_MERGED_DOCS_SETTING = Setting.intSetting( + "index.merge.log_byte_size_policy.max_merged_docs", + DEFAULT_MAX_MERGE_DOCS, + Setting.Property.Dynamic, + Setting.Property.IndexScope + ); + + public static final Setting INDEX_LBS_NO_CFS_RATIO_SETTING = new Setting<>( + "index.merge.log_byte_size_policy.no_cfs_ratio", + Double.toString(DEFAULT_NO_CFS_RATIO), + TieredMergePolicyProvider::parseNoCFSRatio, + Setting.Property.Dynamic, + Setting.Property.IndexScope + ); + + LogByteSizeMergePolicyProvider(Logger logger, IndexSettings indexSettings) { + this.logger = logger; + this.mergesEnabled = indexSettings.getSettings().getAsBoolean(INDEX_MERGE_ENABLED, true); + + // Undocumented settings, works great with defaults + logByteSizeMergePolicy.setMergeFactor(indexSettings.getValue(INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING)); + logByteSizeMergePolicy.setMinMergeMB(indexSettings.getValue(INDEX_LBS_MERGE_POLICY_MIN_MERGE_SETTING).getMbFrac()); + logByteSizeMergePolicy.setMaxMergeMB(indexSettings.getValue(INDEX_LBS_MAX_MERGE_SEGMENT_SETTING).getMbFrac()); + logByteSizeMergePolicy.setMaxMergeMBForForcedMerge( + indexSettings.getValue(INDEX_LBS_MAX_MERGE_SEGMENT_FOR_FORCED_MERGE_SETTING).getMbFrac() + ); + logByteSizeMergePolicy.setMaxMergeDocs(indexSettings.getValue(INDEX_LBS_MAX_MERGED_DOCS_SETTING)); + logByteSizeMergePolicy.setNoCFSRatio(indexSettings.getValue(INDEX_LBS_NO_CFS_RATIO_SETTING)); + } + + @Override + public MergePolicy getMergePolicy() { + return mergesEnabled ? logByteSizeMergePolicy : NoMergePolicy.INSTANCE; + } + + void setLBSMergeFactor(int mergeFactor) { + logByteSizeMergePolicy.setMergeFactor(mergeFactor); + } + + void setLBSMaxMergeSegment(ByteSizeValue maxMergeSegment) { + logByteSizeMergePolicy.setMaxMergeMB(maxMergeSegment.getMbFrac()); + } + + void setLBSMinMergedMB(ByteSizeValue minMergedSize) { + logByteSizeMergePolicy.setMinMergeMB(minMergedSize.getMbFrac()); + } + + void setLBSMaxMergeMBForForcedMerge(ByteSizeValue maxMergeForcedMerge) { + logByteSizeMergePolicy.setMaxMergeMBForForcedMerge(maxMergeForcedMerge.getMbFrac()); + } + + void setLBSMaxMergeDocs(int maxMergeDocs) { + logByteSizeMergePolicy.setMaxMergeDocs(maxMergeDocs); + } + + void setLBSNoCFSRatio(Double noCFSRatio) { + logByteSizeMergePolicy.setNoCFSRatio(noCFSRatio); + } + + @Override + public String toString() { + return "LogByteSizeMergePolicyProvider{" + + "mergeFactor=" + + logByteSizeMergePolicy.getMergeFactor() + + ", minMergeMB=" + + logByteSizeMergePolicy.getMinMergeMB() + + ", maxMergeMB=" + + logByteSizeMergePolicy.getMaxMergeMB() + + ", maxMergeMBForForcedMerge=" + + logByteSizeMergePolicy.getMaxMergeMBForForcedMerge() + + ", maxMergedDocs=" + + logByteSizeMergePolicy.getMaxMergeDocs() + + ", noCFSRatio=" + + logByteSizeMergePolicy.getNoCFSRatio() + + '}'; + } + +} diff --git a/server/src/main/java/org/opensearch/index/MergePolicyProvider.java b/server/src/main/java/org/opensearch/index/MergePolicyProvider.java new file mode 100644 index 0000000000000..6f734314f758f --- /dev/null +++ b/server/src/main/java/org/opensearch/index/MergePolicyProvider.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.apache.lucene.index.MergePolicy; +import org.opensearch.common.annotation.InternalApi; + +/** + * A provider for obtaining merge policies used by OpenSearch indexes. + * + * @opensearch.internal + */ + +@InternalApi +public interface MergePolicyProvider { + // don't convert to Setting<> and register... we only set this in tests and register via a plugin + String INDEX_MERGE_ENABLED = "index.merge.enabled"; + + /** + * Gets the merge policy to be used for index. + * + * @return The merge policy instance. + */ + MergePolicy getMergePolicy(); +} diff --git a/server/src/main/java/org/opensearch/index/MergePolicyConfig.java b/server/src/main/java/org/opensearch/index/TieredMergePolicyProvider.java similarity index 82% rename from server/src/main/java/org/opensearch/index/MergePolicyConfig.java rename to server/src/main/java/org/opensearch/index/TieredMergePolicyProvider.java index fe2af21dfe039..d5d354c6c960a 100644 --- a/server/src/main/java/org/opensearch/index/MergePolicyConfig.java +++ b/server/src/main/java/org/opensearch/index/TieredMergePolicyProvider.java @@ -33,6 +33,7 @@ package org.opensearch.index; import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.LogByteSizeMergePolicy; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.TieredMergePolicy; @@ -47,9 +48,12 @@ * where the index data is stored, and are immutable up to delete markers. * Segments are, periodically, merged into larger segments to keep the * index size at bay and expunge deletes. + * This class customizes and exposes 2 merge policies from lucene - + * {@link LogByteSizeMergePolicy} and {@link TieredMergePolicy}. + * * *

- * Merges select segments of approximately equal size, subject to an allowed + * Tiered merge policy select segments of approximately equal size, subject to an allowed * number of segments per tier. The merge policy is able to merge * non-adjacent segments, and separates how many segments are merged at once from how many * segments are allowed per tier. It also does not over-merge (i.e., cascade merges). @@ -125,8 +129,9 @@ * @opensearch.internal */ -public final class MergePolicyConfig { - private final OpenSearchTieredMergePolicy mergePolicy = new OpenSearchTieredMergePolicy(); +public final class TieredMergePolicyProvider implements MergePolicyProvider { + private final OpenSearchTieredMergePolicy tieredMergePolicy = new OpenSearchTieredMergePolicy(); + private final Logger logger; private final boolean mergesEnabled; @@ -137,10 +142,11 @@ public final class MergePolicyConfig { public static final double DEFAULT_SEGMENTS_PER_TIER = 10.0d; public static final double DEFAULT_RECLAIM_DELETES_WEIGHT = 2.0d; public static final double DEFAULT_DELETES_PCT_ALLOWED = 20.0d; + public static final Setting INDEX_COMPOUND_FORMAT_SETTING = new Setting<>( "index.compound_format", Double.toString(TieredMergePolicy.DEFAULT_NO_CFS_RATIO), - MergePolicyConfig::parseNoCFSRatio, + TieredMergePolicyProvider::parseNoCFSRatio, Property.Dynamic, Property.IndexScope ); @@ -194,10 +200,8 @@ public final class MergePolicyConfig { Property.Dynamic, Property.IndexScope ); - // don't convert to Setting<> and register... we only set this in tests and register via a plugin - public static final String INDEX_MERGE_ENABLED = "index.merge.enabled"; - MergePolicyConfig(Logger logger, IndexSettings indexSettings) { + TieredMergePolicyProvider(Logger logger, IndexSettings indexSettings) { this.logger = logger; double forceMergeDeletesPctAllowed = indexSettings.getValue(INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING); // percentage ByteSizeValue floorSegment = indexSettings.getValue(INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING); @@ -216,54 +220,41 @@ public final class MergePolicyConfig { ); } maxMergeAtOnce = adjustMaxMergeAtOnceIfNeeded(maxMergeAtOnce, segmentsPerTier); - mergePolicy.setNoCFSRatio(indexSettings.getValue(INDEX_COMPOUND_FORMAT_SETTING)); - mergePolicy.setForceMergeDeletesPctAllowed(forceMergeDeletesPctAllowed); - mergePolicy.setFloorSegmentMB(floorSegment.getMbFrac()); - mergePolicy.setMaxMergeAtOnce(maxMergeAtOnce); - mergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac()); - mergePolicy.setSegmentsPerTier(segmentsPerTier); - mergePolicy.setDeletesPctAllowed(deletesPctAllowed); - if (logger.isTraceEnabled()) { - logger.trace( - "using [tiered] merge mergePolicy with expunge_deletes_allowed[{}], floor_segment[{}]," - + " max_merge_at_once[{}], max_merged_segment[{}], segments_per_tier[{}]," - + " deletes_pct_allowed[{}]", - forceMergeDeletesPctAllowed, - floorSegment, - maxMergeAtOnce, - maxMergedSegment, - segmentsPerTier, - deletesPctAllowed - ); - } + tieredMergePolicy.setNoCFSRatio(indexSettings.getValue(INDEX_COMPOUND_FORMAT_SETTING)); + tieredMergePolicy.setForceMergeDeletesPctAllowed(forceMergeDeletesPctAllowed); + tieredMergePolicy.setFloorSegmentMB(floorSegment.getMbFrac()); + tieredMergePolicy.setMaxMergeAtOnce(maxMergeAtOnce); + tieredMergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac()); + tieredMergePolicy.setSegmentsPerTier(segmentsPerTier); + tieredMergePolicy.setDeletesPctAllowed(deletesPctAllowed); } void setSegmentsPerTier(Double segmentsPerTier) { - mergePolicy.setSegmentsPerTier(segmentsPerTier); + tieredMergePolicy.setSegmentsPerTier(segmentsPerTier); } void setMaxMergedSegment(ByteSizeValue maxMergedSegment) { - mergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac()); + tieredMergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac()); } void setMaxMergesAtOnce(Integer maxMergeAtOnce) { - mergePolicy.setMaxMergeAtOnce(maxMergeAtOnce); + tieredMergePolicy.setMaxMergeAtOnce(maxMergeAtOnce); } void setFloorSegmentSetting(ByteSizeValue floorSegementSetting) { - mergePolicy.setFloorSegmentMB(floorSegementSetting.getMbFrac()); + tieredMergePolicy.setFloorSegmentMB(floorSegementSetting.getMbFrac()); } void setExpungeDeletesAllowed(Double value) { - mergePolicy.setForceMergeDeletesPctAllowed(value); + tieredMergePolicy.setForceMergeDeletesPctAllowed(value); } void setNoCFSRatio(Double noCFSRatio) { - mergePolicy.setNoCFSRatio(noCFSRatio); + tieredMergePolicy.setNoCFSRatio(noCFSRatio); } void setDeletesPctAllowed(Double deletesPctAllowed) { - mergePolicy.setDeletesPctAllowed(deletesPctAllowed); + tieredMergePolicy.setDeletesPctAllowed(deletesPctAllowed); } private int adjustMaxMergeAtOnceIfNeeded(int maxMergeAtOnce, double segmentsPerTier) { @@ -285,11 +276,11 @@ private int adjustMaxMergeAtOnceIfNeeded(int maxMergeAtOnce, double segmentsPerT return maxMergeAtOnce; } - MergePolicy getMergePolicy() { - return mergesEnabled ? mergePolicy : NoMergePolicy.INSTANCE; + public MergePolicy getMergePolicy() { + return mergesEnabled ? tieredMergePolicy : NoMergePolicy.INSTANCE; } - private static double parseNoCFSRatio(String noCFSRatio) { + public static double parseNoCFSRatio(String noCFSRatio) { noCFSRatio = noCFSRatio.trim(); if (noCFSRatio.equalsIgnoreCase("true")) { return 1.0d; @@ -310,4 +301,23 @@ private static double parseNoCFSRatio(String noCFSRatio) { } } } + + @Override + public String toString() { + return "TieredMergePolicyProvider{" + + "expungeDeletesAllowed=" + + tieredMergePolicy.getForceMergeDeletesPctAllowed() + + ", floorSegment=" + + tieredMergePolicy.getFloorSegmentMB() + + ", maxMergeAtOnce=" + + tieredMergePolicy.getMaxMergeAtOnce() + + ", maxMergedSegment=" + + tieredMergePolicy.getMaxMergedSegmentMB() + + ", segmentsPerTier=" + + tieredMergePolicy.getSegmentsPerTier() + + ", deletesPctAllowed=" + + tieredMergePolicy.getDeletesPctAllowed() + + '}'; + } + } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index d476e8b7c9288..5ce066b156775 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3772,7 +3772,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro indexSettings, warmer, store, - indexSettings.getMergePolicy(), + indexSettings.getMergePolicy(isTimeSeriesIndex), mapperService != null ? mapperService.indexAnalyzer() : null, similarityService.similarity(mapperService), engineConfigFactory.newCodecServiceOrDefault(indexSettings, mapperService, logger, codecService), diff --git a/server/src/test/java/org/opensearch/action/admin/indices/segments/IndicesSegmentsRequestTests.java b/server/src/test/java/org/opensearch/action/admin/indices/segments/IndicesSegmentsRequestTests.java index 67846efab2af8..d35c821b41aa0 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/segments/IndicesSegmentsRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/segments/IndicesSegmentsRequestTests.java @@ -34,7 +34,7 @@ import org.opensearch.action.support.IndicesOptions; import org.opensearch.common.settings.Settings; -import org.opensearch.index.MergePolicyConfig; +import org.opensearch.index.MergePolicyProvider; import org.opensearch.indices.IndexClosedException; import org.opensearch.plugins.Plugin; import org.opensearch.test.InternalSettingsPlugin; @@ -56,7 +56,7 @@ protected Collection> getPlugins() { public void setupIndex() { Settings settings = Settings.builder() // don't allow any merges so that the num docs is the expected segments - .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + .put(MergePolicyProvider.INDEX_MERGE_ENABLED, false) .build(); createIndex("test", settings); diff --git a/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java b/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java index 387997892ee30..32c4c048d77ba 100644 --- a/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java +++ b/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java @@ -31,6 +31,7 @@ package org.opensearch.index; +import org.apache.lucene.index.LogByteSizeMergePolicy; import org.apache.lucene.index.NoMergePolicy; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeUnit; @@ -49,17 +50,17 @@ public class MergePolicySettingsTests extends OpenSearchTestCase { protected final ShardId shardId = new ShardId("index", "_na_", 1); public void testCompoundFileSettings() throws IOException { - assertThat(new MergePolicyConfig(logger, indexSettings(Settings.EMPTY)).getMergePolicy().getNoCFSRatio(), equalTo(0.1)); - assertThat(new MergePolicyConfig(logger, indexSettings(build(true))).getMergePolicy().getNoCFSRatio(), equalTo(1.0)); - assertThat(new MergePolicyConfig(logger, indexSettings(build(0.5))).getMergePolicy().getNoCFSRatio(), equalTo(0.5)); - assertThat(new MergePolicyConfig(logger, indexSettings(build(1.0))).getMergePolicy().getNoCFSRatio(), equalTo(1.0)); - assertThat(new MergePolicyConfig(logger, indexSettings(build("true"))).getMergePolicy().getNoCFSRatio(), equalTo(1.0)); - assertThat(new MergePolicyConfig(logger, indexSettings(build("True"))).getMergePolicy().getNoCFSRatio(), equalTo(1.0)); - assertThat(new MergePolicyConfig(logger, indexSettings(build("False"))).getMergePolicy().getNoCFSRatio(), equalTo(0.0)); - assertThat(new MergePolicyConfig(logger, indexSettings(build("false"))).getMergePolicy().getNoCFSRatio(), equalTo(0.0)); - assertThat(new MergePolicyConfig(logger, indexSettings(build(false))).getMergePolicy().getNoCFSRatio(), equalTo(0.0)); - assertThat(new MergePolicyConfig(logger, indexSettings(build(0))).getMergePolicy().getNoCFSRatio(), equalTo(0.0)); - assertThat(new MergePolicyConfig(logger, indexSettings(build(0.0))).getMergePolicy().getNoCFSRatio(), equalTo(0.0)); + assertThat(new TieredMergePolicyProvider(logger, indexSettings(Settings.EMPTY)).getMergePolicy().getNoCFSRatio(), equalTo(0.1)); + assertThat(new TieredMergePolicyProvider(logger, indexSettings(build(true))).getMergePolicy().getNoCFSRatio(), equalTo(1.0)); + assertThat(new TieredMergePolicyProvider(logger, indexSettings(build(0.5))).getMergePolicy().getNoCFSRatio(), equalTo(0.5)); + assertThat(new TieredMergePolicyProvider(logger, indexSettings(build(1.0))).getMergePolicy().getNoCFSRatio(), equalTo(1.0)); + assertThat(new TieredMergePolicyProvider(logger, indexSettings(build("true"))).getMergePolicy().getNoCFSRatio(), equalTo(1.0)); + assertThat(new TieredMergePolicyProvider(logger, indexSettings(build("True"))).getMergePolicy().getNoCFSRatio(), equalTo(1.0)); + assertThat(new TieredMergePolicyProvider(logger, indexSettings(build("False"))).getMergePolicy().getNoCFSRatio(), equalTo(0.0)); + assertThat(new TieredMergePolicyProvider(logger, indexSettings(build("false"))).getMergePolicy().getNoCFSRatio(), equalTo(0.0)); + assertThat(new TieredMergePolicyProvider(logger, indexSettings(build(false))).getMergePolicy().getNoCFSRatio(), equalTo(0.0)); + assertThat(new TieredMergePolicyProvider(logger, indexSettings(build(0))).getMergePolicy().getNoCFSRatio(), equalTo(0.0)); + assertThat(new TieredMergePolicyProvider(logger, indexSettings(build(0.0))).getMergePolicy().getNoCFSRatio(), equalTo(0.0)); } private static IndexSettings indexSettings(Settings settings) { @@ -67,33 +68,197 @@ private static IndexSettings indexSettings(Settings settings) { } public void testNoMerges() { - MergePolicyConfig mp = new MergePolicyConfig( + TieredMergePolicyProvider tmp = new TieredMergePolicyProvider( logger, - indexSettings(Settings.builder().put(MergePolicyConfig.INDEX_MERGE_ENABLED, false).build()) + indexSettings(Settings.builder().put(MergePolicyProvider.INDEX_MERGE_ENABLED, false).build()) ); - assertTrue(mp.getMergePolicy() instanceof NoMergePolicy); + LogByteSizeMergePolicyProvider lbsmp = new LogByteSizeMergePolicyProvider( + logger, + indexSettings(Settings.builder().put(MergePolicyProvider.INDEX_MERGE_ENABLED, false).build()) + ); + assertTrue(tmp.getMergePolicy() instanceof NoMergePolicy); + assertTrue(lbsmp.getMergePolicy() instanceof NoMergePolicy); } public void testUpdateSettings() throws IOException { - IndexSettings indexSettings = indexSettings(EMPTY_SETTINGS); - assertThat(indexSettings.getMergePolicy().getNoCFSRatio(), equalTo(0.1)); + Settings settings = Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.TIERED.getValue()) + .build(); + IndexSettings indexSettings = indexSettings(settings); + assertThat(indexSettings.getMergePolicy(false).getNoCFSRatio(), equalTo(0.1)); indexSettings = indexSettings(build(0.9)); - assertThat((indexSettings.getMergePolicy()).getNoCFSRatio(), equalTo(0.9)); + assertThat((indexSettings.getMergePolicy(false)).getNoCFSRatio(), equalTo(0.9)); indexSettings.updateIndexMetadata(newIndexMeta("index", build(0.1))); - assertThat((indexSettings.getMergePolicy()).getNoCFSRatio(), equalTo(0.1)); + assertThat((indexSettings.getMergePolicy(false)).getNoCFSRatio(), equalTo(0.1)); indexSettings.updateIndexMetadata(newIndexMeta("index", build(0.0))); - assertThat((indexSettings.getMergePolicy()).getNoCFSRatio(), equalTo(0.0)); + assertThat((indexSettings.getMergePolicy(false)).getNoCFSRatio(), equalTo(0.0)); indexSettings.updateIndexMetadata(newIndexMeta("index", build("true"))); - assertThat((indexSettings.getMergePolicy()).getNoCFSRatio(), equalTo(1.0)); + assertThat((indexSettings.getMergePolicy(false)).getNoCFSRatio(), equalTo(1.0)); indexSettings.updateIndexMetadata(newIndexMeta("index", build("false"))); - assertThat((indexSettings.getMergePolicy()).getNoCFSRatio(), equalTo(0.0)); + assertThat((indexSettings.getMergePolicy(false)).getNoCFSRatio(), equalTo(0.0)); + } + + public void testDefaultMergePolicy() throws IOException { + IndexSettings indexSettings = indexSettings(EMPTY_SETTINGS); + assertTrue(indexSettings.getMergePolicy(false) instanceof OpenSearchTieredMergePolicy); + assertTrue(indexSettings.getMergePolicy(true) instanceof OpenSearchTieredMergePolicy); + } + + public void testMergePolicyPrecedence() throws IOException { + // 1. INDEX_MERGE_POLICY is not set + // assert defaults + IndexSettings indexSettings = indexSettings(EMPTY_SETTINGS); + assertTrue(indexSettings.getMergePolicy(false) instanceof OpenSearchTieredMergePolicy); + assertTrue(indexSettings.getMergePolicy(true) instanceof OpenSearchTieredMergePolicy); + + // 1.1 node setting TIME_SERIES_INDEX_MERGE_POLICY is set as log_byte_size + // assert index policy is tiered whereas time series index policy is log_byte_size + Settings nodeSettings = Settings.builder() + .put(IndexSettings.TIME_SERIES_INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.LOG_BYTE_SIZE.getValue()) + .build(); + indexSettings = new IndexSettings(newIndexMeta("test", Settings.EMPTY), nodeSettings); + assertTrue(indexSettings.getMergePolicy(false) instanceof OpenSearchTieredMergePolicy); + assertTrue(indexSettings.getMergePolicy(true) instanceof LogByteSizeMergePolicy); + + // 1.2 node setting TIME_SERIES_INDEX_MERGE_POLICY is set as tiered + // assert both index and time series index policy is tiered + nodeSettings = Settings.builder() + .put(IndexSettings.TIME_SERIES_INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.TIERED.getValue()) + .build(); + indexSettings = new IndexSettings(newIndexMeta("test", Settings.EMPTY), nodeSettings); + assertTrue(indexSettings.getMergePolicy(false) instanceof OpenSearchTieredMergePolicy); + assertTrue(indexSettings.getMergePolicy(true) instanceof OpenSearchTieredMergePolicy); + + // 2. INDEX_MERGE_POLICY set as tiered + // assert both index and time-series-index merge policy is set as tiered + indexSettings = indexSettings( + Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.TIERED.getValue()).build() + ); + assertTrue(indexSettings.getMergePolicy(false) instanceof OpenSearchTieredMergePolicy); + assertTrue(indexSettings.getMergePolicy(true) instanceof OpenSearchTieredMergePolicy); + + // 2.1 node setting TIME_SERIES_INDEX_MERGE_POLICY is set as log_byte_size + // assert both index and time-series-index merge policy is set as tiered + nodeSettings = Settings.builder() + .put(IndexSettings.TIME_SERIES_INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.LOG_BYTE_SIZE.getValue()) + .build(); + indexSettings = new IndexSettings( + newIndexMeta( + "test", + Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.TIERED.getValue()).build() + ), + nodeSettings + ); + assertTrue(indexSettings.getMergePolicy(false) instanceof OpenSearchTieredMergePolicy); + assertTrue(indexSettings.getMergePolicy(true) instanceof OpenSearchTieredMergePolicy); + + // 3. INDEX_MERGE_POLICY set as log_byte_size + // assert both index and time-series-index merge policy is set as log_byte_size + indexSettings = indexSettings( + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.LOG_BYTE_SIZE.getValue()) + .build() + ); + assertTrue(indexSettings.getMergePolicy(false) instanceof LogByteSizeMergePolicy); + assertTrue(indexSettings.getMergePolicy(true) instanceof LogByteSizeMergePolicy); + + // 3.1 node setting TIME_SERIES_INDEX_MERGE_POLICY is set as tiered + // assert both index and time-series-index merge policy is set as log_byte_size + nodeSettings = Settings.builder() + .put(IndexSettings.TIME_SERIES_INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.TIERED.getValue()) + .build(); + indexSettings = new IndexSettings( + newIndexMeta( + "test", + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.LOG_BYTE_SIZE.getValue()) + .build() + ), + nodeSettings + ); + assertTrue(indexSettings.getMergePolicy(false) instanceof LogByteSizeMergePolicy); + assertTrue(indexSettings.getMergePolicy(true) instanceof LogByteSizeMergePolicy); + + } + + public void testInvalidMergePolicy() throws IOException { + + final Settings invalidSettings = Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "invalid").build(); + IllegalArgumentException exc1 = expectThrows( + IllegalArgumentException.class, + () -> IndexSettings.INDEX_MERGE_POLICY.get(invalidSettings) + ); + assertThat(exc1.getMessage(), containsString(" has unsupported policy specified: ")); + IllegalArgumentException exc2 = expectThrows( + IllegalArgumentException.class, + () -> indexSettings(invalidSettings).getMergePolicy(false) + ); + assertThat(exc2.getMessage(), containsString(" has unsupported policy specified: ")); + + final Settings invalidSettings2 = Settings.builder().put(IndexSettings.TIME_SERIES_INDEX_MERGE_POLICY.getKey(), "invalid").build(); + IllegalArgumentException exc3 = expectThrows( + IllegalArgumentException.class, + () -> IndexSettings.TIME_SERIES_INDEX_MERGE_POLICY.get(invalidSettings2) + ); + assertThat(exc3.getMessage(), containsString(" has unsupported policy specified: ")); + + IllegalArgumentException exc4 = expectThrows( + IllegalArgumentException.class, + () -> new IndexSettings(newIndexMeta("test", Settings.EMPTY), invalidSettings2).getMergePolicy(true) + ); + assertThat(exc4.getMessage(), containsString(" has unsupported policy specified: ")); + } + + public void testUpdateSettingsForLogByteSizeMergePolicy() throws IOException { + IndexSettings indexSettings = indexSettings( + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.LOG_BYTE_SIZE.getValue()) + .build() + ); + assertTrue(indexSettings.getMergePolicy(true) instanceof LogByteSizeMergePolicy); + assertThat(indexSettings.getMergePolicy(true).getNoCFSRatio(), equalTo(0.1)); + indexSettings = indexSettings( + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") + .put(LogByteSizeMergePolicyProvider.INDEX_LBS_NO_CFS_RATIO_SETTING.getKey(), 0.9) + .build() + ); + assertThat((indexSettings.getMergePolicy(true)).getNoCFSRatio(), equalTo(0.9)); + indexSettings = indexSettings( + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") + .put(LogByteSizeMergePolicyProvider.INDEX_LBS_NO_CFS_RATIO_SETTING.getKey(), 0.1) + .build() + ); + assertThat((indexSettings.getMergePolicy(true)).getNoCFSRatio(), equalTo(0.1)); + indexSettings = indexSettings( + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") + .put(LogByteSizeMergePolicyProvider.INDEX_LBS_NO_CFS_RATIO_SETTING.getKey(), 0.0) + .build() + ); + assertThat((indexSettings.getMergePolicy(true)).getNoCFSRatio(), equalTo(0.0)); + indexSettings = indexSettings( + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") + .put(LogByteSizeMergePolicyProvider.INDEX_LBS_NO_CFS_RATIO_SETTING.getKey(), "true") + .build() + ); + assertThat((indexSettings.getMergePolicy(true)).getNoCFSRatio(), equalTo(1.0)); + indexSettings = indexSettings( + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") + .put(LogByteSizeMergePolicyProvider.INDEX_LBS_NO_CFS_RATIO_SETTING.getKey(), "false") + .build() + ); + assertThat((indexSettings.getMergePolicy(true)).getNoCFSRatio(), equalTo(0.0)); } public void testTieredMergePolicySettingsUpdate() throws IOException { IndexSettings indexSettings = indexSettings(Settings.EMPTY); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getForceMergeDeletesPctAllowed(), - MergePolicyConfig.DEFAULT_EXPUNGE_DELETES_ALLOWED, + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getForceMergeDeletesPctAllowed(), + TieredMergePolicyProvider.DEFAULT_EXPUNGE_DELETES_ALLOWED, 0.0d ); @@ -102,21 +267,21 @@ public void testTieredMergePolicySettingsUpdate() throws IOException { "index", Settings.builder() .put( - MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING.getKey(), - MergePolicyConfig.DEFAULT_EXPUNGE_DELETES_ALLOWED + 1.0d + TieredMergePolicyProvider.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING.getKey(), + TieredMergePolicyProvider.DEFAULT_EXPUNGE_DELETES_ALLOWED + 1.0d ) .build() ) ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getForceMergeDeletesPctAllowed(), - MergePolicyConfig.DEFAULT_EXPUNGE_DELETES_ALLOWED + 1.0d, + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getForceMergeDeletesPctAllowed(), + TieredMergePolicyProvider.DEFAULT_EXPUNGE_DELETES_ALLOWED + 1.0d, 0.0d ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getFloorSegmentMB(), - MergePolicyConfig.DEFAULT_FLOOR_SEGMENT.getMbFrac(), + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getFloorSegmentMB(), + TieredMergePolicyProvider.DEFAULT_FLOOR_SEGMENT.getMbFrac(), 0 ); indexSettings.updateIndexMetadata( @@ -124,41 +289,41 @@ public void testTieredMergePolicySettingsUpdate() throws IOException { "index", Settings.builder() .put( - MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey(), - new ByteSizeValue(MergePolicyConfig.DEFAULT_FLOOR_SEGMENT.getMb() + 1, ByteSizeUnit.MB) + TieredMergePolicyProvider.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey(), + new ByteSizeValue(TieredMergePolicyProvider.DEFAULT_FLOOR_SEGMENT.getMb() + 1, ByteSizeUnit.MB) ) .build() ) ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getFloorSegmentMB(), - new ByteSizeValue(MergePolicyConfig.DEFAULT_FLOOR_SEGMENT.getMb() + 1, ByteSizeUnit.MB).getMbFrac(), + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getFloorSegmentMB(), + new ByteSizeValue(TieredMergePolicyProvider.DEFAULT_FLOOR_SEGMENT.getMb() + 1, ByteSizeUnit.MB).getMbFrac(), 0.001 ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getMaxMergeAtOnce(), - MergePolicyConfig.DEFAULT_MAX_MERGE_AT_ONCE + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getMaxMergeAtOnce(), + TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE ); indexSettings.updateIndexMetadata( newIndexMeta( "index", Settings.builder() .put( - MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), - MergePolicyConfig.DEFAULT_MAX_MERGE_AT_ONCE - 1 + TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), + TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE - 1 ) .build() ) ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getMaxMergeAtOnce(), - MergePolicyConfig.DEFAULT_MAX_MERGE_AT_ONCE - 1 + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getMaxMergeAtOnce(), + TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE - 1 ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getMaxMergedSegmentMB(), - MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT.getMbFrac(), + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getMaxMergedSegmentMB(), + TieredMergePolicyProvider.DEFAULT_MAX_MERGED_SEGMENT.getMbFrac(), 0.0001 ); indexSettings.updateIndexMetadata( @@ -166,21 +331,21 @@ public void testTieredMergePolicySettingsUpdate() throws IOException { "index", Settings.builder() .put( - MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING.getKey(), - new ByteSizeValue(MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT.getBytes() + 1) + TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING.getKey(), + new ByteSizeValue(TieredMergePolicyProvider.DEFAULT_MAX_MERGED_SEGMENT.getBytes() + 1) ) .build() ) ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getMaxMergedSegmentMB(), - new ByteSizeValue(MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT.getBytes() + 1).getMbFrac(), + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getMaxMergedSegmentMB(), + new ByteSizeValue(TieredMergePolicyProvider.DEFAULT_MAX_MERGED_SEGMENT.getBytes() + 1).getMbFrac(), 0.0001 ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getSegmentsPerTier(), - MergePolicyConfig.DEFAULT_SEGMENTS_PER_TIER, + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getSegmentsPerTier(), + TieredMergePolicyProvider.DEFAULT_SEGMENTS_PER_TIER, 0 ); indexSettings.updateIndexMetadata( @@ -188,37 +353,37 @@ public void testTieredMergePolicySettingsUpdate() throws IOException { "index", Settings.builder() .put( - MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), - MergePolicyConfig.DEFAULT_SEGMENTS_PER_TIER + 1 + TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), + TieredMergePolicyProvider.DEFAULT_SEGMENTS_PER_TIER + 1 ) .build() ) ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getSegmentsPerTier(), - MergePolicyConfig.DEFAULT_SEGMENTS_PER_TIER + 1, + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getSegmentsPerTier(), + TieredMergePolicyProvider.DEFAULT_SEGMENTS_PER_TIER + 1, 0 ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getDeletesPctAllowed(), - MergePolicyConfig.DEFAULT_DELETES_PCT_ALLOWED, + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getDeletesPctAllowed(), + TieredMergePolicyProvider.DEFAULT_DELETES_PCT_ALLOWED, 0 ); indexSettings.updateIndexMetadata( newIndexMeta( "index", - Settings.builder().put(MergePolicyConfig.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING.getKey(), 22).build() + Settings.builder().put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING.getKey(), 22).build() ) ); - assertEquals(((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getDeletesPctAllowed(), 22, 0); + assertEquals(((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getDeletesPctAllowed(), 22, 0); IllegalArgumentException exc = expectThrows( IllegalArgumentException.class, () -> indexSettings.updateIndexMetadata( newIndexMeta( "index", - Settings.builder().put(MergePolicyConfig.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING.getKey(), 53).build() + Settings.builder().put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING.getKey(), 53).build() ) ) ); @@ -226,50 +391,162 @@ public void testTieredMergePolicySettingsUpdate() throws IOException { assertThat(cause.getMessage(), containsString("must be <= 50.0")); indexSettings.updateIndexMetadata(newIndexMeta("index", EMPTY_SETTINGS)); // see if defaults are restored assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getForceMergeDeletesPctAllowed(), - MergePolicyConfig.DEFAULT_EXPUNGE_DELETES_ALLOWED, + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getForceMergeDeletesPctAllowed(), + TieredMergePolicyProvider.DEFAULT_EXPUNGE_DELETES_ALLOWED, 0.0d ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getFloorSegmentMB(), - new ByteSizeValue(MergePolicyConfig.DEFAULT_FLOOR_SEGMENT.getMb(), ByteSizeUnit.MB).getMbFrac(), + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getFloorSegmentMB(), + new ByteSizeValue(TieredMergePolicyProvider.DEFAULT_FLOOR_SEGMENT.getMb(), ByteSizeUnit.MB).getMbFrac(), 0.00 ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getMaxMergeAtOnce(), - MergePolicyConfig.DEFAULT_MAX_MERGE_AT_ONCE + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getMaxMergeAtOnce(), + TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getMaxMergedSegmentMB(), - new ByteSizeValue(MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT.getBytes() + 1).getMbFrac(), + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getMaxMergedSegmentMB(), + new ByteSizeValue(TieredMergePolicyProvider.DEFAULT_MAX_MERGED_SEGMENT.getBytes() + 1).getMbFrac(), 0.0001 ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getSegmentsPerTier(), - MergePolicyConfig.DEFAULT_SEGMENTS_PER_TIER, + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getSegmentsPerTier(), + TieredMergePolicyProvider.DEFAULT_SEGMENTS_PER_TIER, 0 ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getDeletesPctAllowed(), - MergePolicyConfig.DEFAULT_DELETES_PCT_ALLOWED, + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getDeletesPctAllowed(), + TieredMergePolicyProvider.DEFAULT_DELETES_PCT_ALLOWED, 0 ); } + public void testLogByteSizeMergePolicySettingsUpdate() throws IOException { + + IndexSettings indexSettings = indexSettings( + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.LOG_BYTE_SIZE.getValue()) + .build() + ); + assertEquals( + ((LogByteSizeMergePolicy) indexSettings.getMergePolicy(true)).getMergeFactor(), + LogByteSizeMergePolicyProvider.DEFAULT_MERGE_FACTOR + ); + + indexSettings.updateIndexMetadata( + newIndexMeta( + "index", + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") + .put( + LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey(), + LogByteSizeMergePolicyProvider.DEFAULT_MERGE_FACTOR + 1 + ) + .build() + ) + ); + assertEquals( + ((LogByteSizeMergePolicy) indexSettings.getMergePolicy(true)).getMergeFactor(), + LogByteSizeMergePolicyProvider.DEFAULT_MERGE_FACTOR + 1 + ); + + indexSettings.updateIndexMetadata( + newIndexMeta( + "index", + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") + .put( + LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MIN_MERGE_SETTING.getKey(), + new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MIN_MERGE.getMb() + 1, ByteSizeUnit.MB) + ) + .build() + ) + ); + + assertEquals( + ((LogByteSizeMergePolicy) indexSettings.getMergePolicy(true)).getMinMergeMB(), + new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MIN_MERGE.getMb() + 1, ByteSizeUnit.MB).getMbFrac(), + 0.001 + ); + + indexSettings.updateIndexMetadata( + newIndexMeta( + "index", + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") + .put( + LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_SETTING.getKey(), + new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MAX_MERGED_SEGMENT.getMb() + 100, ByteSizeUnit.MB) + ) + .build() + ) + ); + + assertEquals( + ((LogByteSizeMergePolicy) indexSettings.getMergePolicy(true)).getMaxMergeMB(), + new ByteSizeValue(TieredMergePolicyProvider.DEFAULT_MAX_MERGED_SEGMENT.getMb() + 100, ByteSizeUnit.MB).getMbFrac(), + 0.001 + ); + + indexSettings.updateIndexMetadata( + newIndexMeta( + "index", + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") + .put( + LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_FOR_FORCED_MERGE_SETTING.getKey(), + new ByteSizeValue( + LogByteSizeMergePolicyProvider.DEFAULT_MAX_MERGE_SEGMENT_FORCE_MERGE.getMb() - 100, + ByteSizeUnit.MB + ) + ) + .build() + ) + ); + assertEquals( + ((LogByteSizeMergePolicy) indexSettings.getMergePolicy(true)).getMaxMergeMBForForcedMerge(), + new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MAX_MERGE_SEGMENT_FORCE_MERGE.getMb() - 100, ByteSizeUnit.MB) + .getMbFrac(), + 0.001 + ); + + indexSettings.updateIndexMetadata( + newIndexMeta( + "index", + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") + .put(LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGED_DOCS_SETTING.getKey(), 10000000) + .build() + ) + ); + assertEquals(((LogByteSizeMergePolicy) indexSettings.getMergePolicy(true)).getMaxMergeDocs(), 10000000); + + indexSettings.updateIndexMetadata( + newIndexMeta( + "index", + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") + .put(LogByteSizeMergePolicyProvider.INDEX_LBS_NO_CFS_RATIO_SETTING.getKey(), 0.1) + .build() + ) + ); + assertEquals(indexSettings.getMergePolicy(true).getNoCFSRatio(), 0.1, 0.0); + } + public Settings build(String value) { - return Settings.builder().put(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING.getKey(), value).build(); + return Settings.builder().put(TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING.getKey(), value).build(); } public Settings build(double value) { - return Settings.builder().put(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING.getKey(), value).build(); + return Settings.builder().put(TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING.getKey(), value).build(); } public Settings build(int value) { - return Settings.builder().put(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING.getKey(), value).build(); + return Settings.builder().put(TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING.getKey(), value).build(); } public Settings build(boolean value) { - return Settings.builder().put(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING.getKey(), value).build(); + return Settings.builder().put(TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING.getKey(), value).build(); } } diff --git a/server/src/test/java/org/opensearch/index/MergeSchedulerSettingsTests.java b/server/src/test/java/org/opensearch/index/MergeSchedulerSettingsTests.java index 2443ee1ab40be..baaf584702f78 100644 --- a/server/src/test/java/org/opensearch/index/MergeSchedulerSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/MergeSchedulerSettingsTests.java @@ -92,8 +92,8 @@ public void testUpdateAutoThrottleSettings() throws Exception { .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0") - .put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2") - .put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2") .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1") .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "2") .put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), "true"); @@ -123,8 +123,8 @@ public void testUpdateMergeMaxThreadCount() throws Exception { .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0") - .put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2") - .put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2") .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "10000") .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "10000"); IndexSettings settings = new IndexSettings(newIndexMeta("index", builder.build()), Settings.EMPTY); diff --git a/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java b/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java index 9c8f9896850c6..c88c86d51be08 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java @@ -58,7 +58,7 @@ import org.opensearch.env.TestEnvironment; import org.opensearch.gateway.PersistedClusterStateService; import org.opensearch.index.IndexSettings; -import org.opensearch.index.MergePolicyConfig; +import org.opensearch.index.MergePolicyProvider; import org.opensearch.index.engine.EngineConfigFactory; import org.opensearch.index.engine.EngineCreationFailureException; import org.opensearch.index.engine.InternalEngineFactory; @@ -134,7 +134,7 @@ public void setup() throws IOException { final Settings settings = Settings.builder() .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + .put(MergePolicyProvider.INDEX_MERGE_ENABLED, false) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetadata.SETTING_INDEX_UUID, shardId.getIndex().getUUID()) .build(); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java index 11d916616578d..ad90255a3cc3f 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java @@ -54,7 +54,7 @@ import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.index.IndexSettings; -import org.opensearch.index.MergePolicyConfig; +import org.opensearch.index.MergePolicyProvider; import org.opensearch.index.VersionType; import org.opensearch.index.engine.DocIdSeqNoAndSource; import org.opensearch.index.engine.Engine; @@ -168,7 +168,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception { .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10) // If soft-deletes is enabled, delete#1 will be reclaimed because its segment (segment_1) is fully deleted // index#0 will be retained if merge is disabled; otherwise it will be reclaimed because gcp=3 and retained_ops=0 - .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + .put(MergePolicyProvider.INDEX_MERGE_ENABLED, false) .build(); try (ReplicationGroup shards = createGroup(1, settings)) { shards.startAll(); diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 6e064f943ca07..0b80c6e577f95 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -130,9 +130,9 @@ import org.opensearch.http.HttpInfo; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; -import org.opensearch.index.MergePolicyConfig; import org.opensearch.index.MergeSchedulerConfig; import org.opensearch.index.MockEngineFactoryPlugin; +import org.opensearch.index.TieredMergePolicyProvider; import org.opensearch.index.codec.CodecService; import org.opensearch.index.engine.Segment; import org.opensearch.index.mapper.CompletionFieldMapper; @@ -500,7 +500,7 @@ protected Settings.Builder setRandomIndexSettings(Random random, Settings.Builde private static Settings.Builder setRandomIndexMergeSettings(Random random, Settings.Builder builder) { if (random.nextBoolean()) { builder.put( - MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING.getKey(), + TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING.getKey(), (random.nextBoolean() ? random.nextDouble() : random.nextBoolean()).toString() ); }