Skip to content

Commit

Permalink
Use of LogByteSizeMergePolicy for data stream use cases (#9992) (#10312)
Browse files Browse the repository at this point in the history
* Configurable merge policy for index

* additional setting to configure merge policy for timestamp based index
* introduction of logbytesize merge policy as an option

Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>

* remove the trace log not required anymore

Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>

* Refactor the merge policy extraction logic

Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>

* Rename constant DEFAULT to DEFAULT_POLICY

Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>

* Simplify merge policy extraction and selection logic

Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>

* missing javadoc error

Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>

* Renaming log byte size policy setting with mb

Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>

* Move validation exception to enum from setting defn

Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>

* rename time_index to time_series_index

Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>

---------

Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
(cherry picked from commit fa66beb)
  • Loading branch information
rishabhmaurya authored Oct 3, 2023
1 parent a0f9736 commit 81cd1f2
Show file tree
Hide file tree
Showing 19 changed files with 776 additions and 171 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add capability to restrict async durability mode for remote indexes ([#10189](https://github.com/opensearch-project/OpenSearch/pull/10189))
- Add Doc Status Counter for Indexing Engine ([#4562](https://github.com/opensearch-project/OpenSearch/issues/4562))
- Add unreferenced file cleanup count to merge stats ([#10204](https://github.com/opensearch-project/OpenSearch/pull/10204))
- Configurable merge policy for index with an option to choose from LogByteSize and Tiered merge policy ([#9992](https://github.com/opensearch-project/OpenSearch/pull/9992))

### Dependencies
- Bump JNA version from 5.5 to 5.13 ([#9963](https://github.com/opensearch-project/OpenSearch/pull/9963))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.MergePolicyConfig;
import org.opensearch.index.MergePolicyProvider;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.shard.ShardPath;
Expand Down Expand Up @@ -519,7 +519,7 @@ public void testReuseInFileBasedPeerRecovery() throws Exception {
.put("number_of_replicas", 1)

// disable merges to keep segments the same
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MergePolicyProvider.INDEX_MERGE_ENABLED, false)

// expire retention leases quickly
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
import org.opensearch.env.TestEnvironment;
import org.opensearch.gateway.GatewayMetaState;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.MergePolicyConfig;
import org.opensearch.index.MergePolicyProvider;
import org.opensearch.index.MockEngineFactoryPlugin;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.translog.TestTranslog;
Expand Down Expand Up @@ -135,7 +135,7 @@ public void testCorruptIndex() throws Exception {
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MergePolicyProvider.INDEX_MERGE_ENABLED, false)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "-1")
.put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true)
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "checksum")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.MergePolicyConfig;
import org.opensearch.index.MergePolicyProvider;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
Expand Down Expand Up @@ -167,7 +167,7 @@ public void testCorruptFileAndRecover() throws ExecutionException, InterruptedEx
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1")
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MergePolicyProvider.INDEX_MERGE_ENABLED, false)
// no checkindex - we corrupt shards on purpose
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false)
// no translog based flush - it might change the .liv / segments.N files
Expand Down Expand Up @@ -286,7 +286,7 @@ public void testCorruptPrimaryNoReplica() throws ExecutionException, Interrupted
prepareCreate("test").setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0")
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MergePolicyProvider.INDEX_MERGE_ENABLED, false)
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) // no checkindex - we corrupt shards on
// purpose
// no translog based flush - it might change the .liv / segments.N files
Expand Down Expand Up @@ -552,7 +552,7 @@ public void testCorruptFileThenSnapshotAndRestore() throws ExecutionException, I
prepareCreate("test").setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0") // no replicas for this test
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MergePolicyProvider.INDEX_MERGE_ENABLED, false)
// no checkindex - we corrupt shards on purpose
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false)
// no translog based flush - it might change the .liv / segments.N files
Expand Down Expand Up @@ -624,7 +624,7 @@ public void testReplicaCorruption() throws Exception {
prepareCreate("test").setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1)
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MergePolicyProvider.INDEX_MERGE_ENABLED, false)
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) // no checkindex - we corrupt shards on
// purpose
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.PB)) // no
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.MergePolicyConfig;
import org.opensearch.index.MergeSchedulerConfig;
import org.opensearch.index.TieredMergePolicyProvider;
import org.opensearch.index.VersionType;
import org.opensearch.index.cache.query.QueryCacheStats;
import org.opensearch.index.engine.VersionConflictEngineException;
Expand Down Expand Up @@ -589,8 +589,8 @@ public void testNonThrottleStats() throws Exception {
prepareCreate("test").setSettings(
settingsBuilder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1")
.put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "10000")
)
Expand Down Expand Up @@ -621,8 +621,8 @@ public void testThrottleStats() throws Exception {
prepareCreate("test").setSettings(
settingsBuilder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1")
.put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "1")
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC.name())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.index.MergePolicyConfig;
import org.opensearch.index.MergePolicyProvider;
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.plugins.Plugin;
Expand Down Expand Up @@ -669,7 +669,7 @@ public void run() {

public void testStressUpdateDeleteConcurrency() throws Exception {
// We create an index with merging disabled so that deletes don't get merged away
assertAcked(prepareCreate("test").setSettings(Settings.builder().put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)));
assertAcked(prepareCreate("test").setSettings(Settings.builder().put(MergePolicyProvider.INDEX_MERGE_ENABLED, false)));
ensureGreen();

Script fieldIncScript = new Script(ScriptType.INLINE, UPDATE_SCRIPTS, FIELD_INC_SCRIPT, Collections.singletonMap("field", "field"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ public void apply(Settings value, Settings current, Settings previous) {
NetworkService.TCP_CONNECT_TIMEOUT,
IndexSettings.QUERY_STRING_ANALYZE_WILDCARD,
IndexSettings.QUERY_STRING_ALLOW_LEADING_WILDCARD,
IndexSettings.TIME_SERIES_INDEX_MERGE_POLICY,
ScriptService.SCRIPT_GENERAL_CACHE_SIZE_SETTING,
ScriptService.SCRIPT_GENERAL_CACHE_EXPIRE_SETTING,
ScriptService.SCRIPT_GENERAL_MAX_COMPILATIONS_RATE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IndexSortConfig;
import org.opensearch.index.IndexingSlowLog;
import org.opensearch.index.MergePolicyConfig;
import org.opensearch.index.LogByteSizeMergePolicyProvider;
import org.opensearch.index.MergePolicyProvider;
import org.opensearch.index.MergeSchedulerConfig;
import org.opensearch.index.SearchSlowLog;
import org.opensearch.index.TieredMergePolicyProvider;
import org.opensearch.index.cache.bitset.BitsetFilterCache;
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.index.fielddata.IndexFieldDataService;
Expand Down Expand Up @@ -120,14 +122,14 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexingSlowLog.INDEX_INDEXING_SLOWLOG_LEVEL_SETTING,
IndexingSlowLog.INDEX_INDEXING_SLOWLOG_REFORMAT_SETTING,
IndexingSlowLog.INDEX_INDEXING_SLOWLOG_MAX_SOURCE_CHARS_TO_LOG_SETTING,
MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT_SETTING,
TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING,
TieredMergePolicyProvider.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING,
TieredMergePolicyProvider.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING,
TieredMergePolicyProvider.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING,
TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING,
TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING,
TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING,
TieredMergePolicyProvider.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT_SETTING,
IndexSortConfig.INDEX_SORT_FIELD_SETTING,
IndexSortConfig.INDEX_SORT_ORDER_SETTING,
IndexSortConfig.INDEX_SORT_MISSING_SETTING,
Expand Down Expand Up @@ -202,6 +204,13 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED,
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME,
IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY,
IndexSettings.INDEX_MERGE_POLICY,
LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING,
LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MIN_MERGE_SETTING,
LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_SETTING,
LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_FOR_FORCED_MERGE_SETTING,
LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGED_DOCS_SETTING,
LogByteSizeMergePolicyProvider.INDEX_LBS_NO_CFS_RATIO_SETTING,
IndexSettings.DEFAULT_SEARCH_PIPELINE,

// Settings for Searchable Snapshots
Expand Down Expand Up @@ -275,7 +284,7 @@ public boolean isPrivateSetting(String key) {
case IndexMetadata.SETTING_HISTORY_UUID:
case IndexMetadata.SETTING_VERSION_UPGRADED:
case IndexMetadata.SETTING_INDEX_PROVIDED_NAME:
case MergePolicyConfig.INDEX_MERGE_ENABLED:
case MergePolicyProvider.INDEX_MERGE_ENABLED:
// we keep the shrink settings for BWC - this can be removed in 8.0
// we can't remove in 7 since this setting might be baked into an index coming in via a full cluster restart from 6.0
case "index.shrink.source.uuid":
Expand Down
Loading

0 comments on commit 81cd1f2

Please sign in to comment.