Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable merge policy and option to choose between LogByteSize and Tiered MergePolicy #9992

Merged
merged 9 commits into from
Oct 2, 2023
Next Next commit
Configurable merge policy for index
* additional setting to configure merge policy for timestamp based index
* introduction of logbytesize merge policy as an option

Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
  • Loading branch information
rishabhmaurya committed Oct 2, 2023
commit e5b8b2d02fafb9d001712214106b4480600938bc
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement on behalf of token passing for extensions ([#8679](https://github.com/opensearch-project/OpenSearch/pull/8679))
- Implement Visitor Design pattern in QueryBuilder to enable the capability to traverse through the complex QueryBuilder tree. ([#10110](https://github.com/opensearch-project/OpenSearch/pull/10110))
- Provide service accounts tokens to extensions ([#9618](https://github.com/opensearch-project/OpenSearch/pull/9618))
- Configurable merge policy for index with an option to choose from LogByteSize and Tiered merge policy ([#9992](https://github.com/opensearch-project/OpenSearch/pull/9992))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.MergePolicyConfig;
import org.opensearch.index.MergePolicyProvider;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.shard.ShardPath;
Expand Down Expand Up @@ -519,7 +519,7 @@ public void testReuseInFileBasedPeerRecovery() throws Exception {
.put("number_of_replicas", 1)

// disable merges to keep segments the same
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MergePolicyProvider.INDEX_MERGE_ENABLED, false)

// expire retention leases quickly
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
import org.opensearch.env.TestEnvironment;
import org.opensearch.gateway.GatewayMetaState;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.MergePolicyConfig;
import org.opensearch.index.MergePolicyProvider;
import org.opensearch.index.MockEngineFactoryPlugin;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.translog.TestTranslog;
Expand Down Expand Up @@ -135,7 +135,7 @@ public void testCorruptIndex() throws Exception {
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MergePolicyProvider.INDEX_MERGE_ENABLED, false)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "-1")
.put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true)
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "checksum")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.MergePolicyConfig;
import org.opensearch.index.MergePolicyProvider;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
Expand Down Expand Up @@ -167,7 +167,7 @@ public void testCorruptFileAndRecover() throws ExecutionException, InterruptedEx
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1")
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MergePolicyProvider.INDEX_MERGE_ENABLED, false)
// no checkindex - we corrupt shards on purpose
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false)
// no translog based flush - it might change the .liv / segments.N files
Expand Down Expand Up @@ -286,7 +286,7 @@ public void testCorruptPrimaryNoReplica() throws ExecutionException, Interrupted
prepareCreate("test").setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0")
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MergePolicyProvider.INDEX_MERGE_ENABLED, false)
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) // no checkindex - we corrupt shards on
// purpose
// no translog based flush - it might change the .liv / segments.N files
Expand Down Expand Up @@ -552,7 +552,7 @@ public void testCorruptFileThenSnapshotAndRestore() throws ExecutionException, I
prepareCreate("test").setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0") // no replicas for this test
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MergePolicyProvider.INDEX_MERGE_ENABLED, false)
// no checkindex - we corrupt shards on purpose
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false)
// no translog based flush - it might change the .liv / segments.N files
Expand Down Expand Up @@ -624,7 +624,7 @@ public void testReplicaCorruption() throws Exception {
prepareCreate("test").setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1)
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MergePolicyProvider.INDEX_MERGE_ENABLED, false)
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) // no checkindex - we corrupt shards on
// purpose
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.PB)) // no
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.MergePolicyConfig;
import org.opensearch.index.MergeSchedulerConfig;
import org.opensearch.index.TieredMergePolicyProvider;
import org.opensearch.index.VersionType;
import org.opensearch.index.cache.query.QueryCacheStats;
import org.opensearch.index.engine.VersionConflictEngineException;
Expand Down Expand Up @@ -589,8 +589,8 @@ public void testNonThrottleStats() throws Exception {
prepareCreate("test").setSettings(
settingsBuilder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1")
.put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "10000")
)
Expand Down Expand Up @@ -621,8 +621,8 @@ public void testThrottleStats() throws Exception {
prepareCreate("test").setSettings(
settingsBuilder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1")
.put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "1")
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC.name())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.index.MergePolicyConfig;
import org.opensearch.index.MergePolicyProvider;
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.plugins.Plugin;
Expand Down Expand Up @@ -669,7 +669,7 @@ public void run() {

public void testStressUpdateDeleteConcurrency() throws Exception {
// We create an index with merging disabled so that deletes don't get merged away
assertAcked(prepareCreate("test").setSettings(Settings.builder().put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)));
assertAcked(prepareCreate("test").setSettings(Settings.builder().put(MergePolicyProvider.INDEX_MERGE_ENABLED, false)));
ensureGreen();

Script fieldIncScript = new Script(ScriptType.INLINE, UPDATE_SCRIPTS, FIELD_INC_SCRIPT, Collections.singletonMap("field", "field"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ public void apply(Settings value, Settings current, Settings previous) {
NetworkService.TCP_CONNECT_TIMEOUT,
IndexSettings.QUERY_STRING_ANALYZE_WILDCARD,
IndexSettings.QUERY_STRING_ALLOW_LEADING_WILDCARD,
IndexSettings.TIME_INDEX_MERGE_POLICY,
rishabhmaurya marked this conversation as resolved.
Show resolved Hide resolved
ScriptService.SCRIPT_GENERAL_CACHE_SIZE_SETTING,
ScriptService.SCRIPT_GENERAL_CACHE_EXPIRE_SETTING,
ScriptService.SCRIPT_GENERAL_MAX_COMPILATIONS_RATE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IndexSortConfig;
import org.opensearch.index.IndexingSlowLog;
import org.opensearch.index.MergePolicyConfig;
import org.opensearch.index.LogByteSizeMergePolicyProvider;
import org.opensearch.index.MergePolicyProvider;
import org.opensearch.index.MergeSchedulerConfig;
import org.opensearch.index.SearchSlowLog;
import org.opensearch.index.TieredMergePolicyProvider;
import org.opensearch.index.cache.bitset.BitsetFilterCache;
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.index.fielddata.IndexFieldDataService;
Expand Down Expand Up @@ -120,14 +122,14 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexingSlowLog.INDEX_INDEXING_SLOWLOG_LEVEL_SETTING,
IndexingSlowLog.INDEX_INDEXING_SLOWLOG_REFORMAT_SETTING,
IndexingSlowLog.INDEX_INDEXING_SLOWLOG_MAX_SOURCE_CHARS_TO_LOG_SETTING,
MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT_SETTING,
TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING,
TieredMergePolicyProvider.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING,
TieredMergePolicyProvider.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING,
TieredMergePolicyProvider.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING,
TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING,
TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING,
TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING,
TieredMergePolicyProvider.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT_SETTING,
IndexSortConfig.INDEX_SORT_FIELD_SETTING,
IndexSortConfig.INDEX_SORT_ORDER_SETTING,
IndexSortConfig.INDEX_SORT_MISSING_SETTING,
Expand Down Expand Up @@ -202,6 +204,13 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED,
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME,
IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY,
IndexSettings.INDEX_MERGE_POLICY,
LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING,
LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MIN_MERGE_MB_SETTING,
LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_MB_SETTING,
LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_MB_FOR_FORCED_MERGE_SETTING,
LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGED_DOCS_SETTING,
LogByteSizeMergePolicyProvider.INDEX_LBS_NO_CFS_RATIO_SETTING,
IndexSettings.DEFAULT_SEARCH_PIPELINE,

// Settings for Searchable Snapshots
Expand Down Expand Up @@ -275,7 +284,7 @@ public boolean isPrivateSetting(String key) {
case IndexMetadata.SETTING_HISTORY_UUID:
case IndexMetadata.SETTING_VERSION_UPGRADED:
case IndexMetadata.SETTING_INDEX_PROVIDED_NAME:
case MergePolicyConfig.INDEX_MERGE_ENABLED:
case MergePolicyProvider.INDEX_MERGE_ENABLED:
// we keep the shrink settings for BWC - this can be removed in 8.0
// we can't remove in 7 since this setting might be baked into an index coming in via a full cluster restart from 6.0
case "index.shrink.source.uuid":
Expand Down
Loading