Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,8 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING, // deprecated
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING,
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE,
SearchService.CLUSTER_CONCURRENT_INTRA_SEGMENT_SEARCH_MODE,
SearchService.CONCURRENT_INTRA_SEGMENT_SEARCH_PARTITION_SIZE_SETTING,

RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING, // deprecated
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_MODE,
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_MAX_SLICE_COUNT,
IndexSettings.INDEX_CONCURRENT_INTRA_SEGMENT_SEARCH_MODE,
IndexSettings.INDEX_CONCURRENT_INTRA_SEGMENT_PARTITION_SIZE,
IndexSettings.ALLOW_DERIVED_FIELDS,

// Settings for star tree index
Expand Down
29 changes: 29 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING;
import static org.opensearch.search.SearchService.CONCURRENT_INTRA_SEGMENT_DEFAULT_PARTITION_SIZE_VALUE;
import static org.opensearch.search.SearchService.CONCURRENT_INTRA_SEGMENT_MINIMUM_PARTITION_SIZE_VALUE;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_DEFAULT_SLICE_COUNT_VALUE;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_MIN_SLICE_COUNT_VALUE;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_MODE_ALL;
Expand Down Expand Up @@ -713,6 +715,7 @@ public static IndexMergePolicy fromString(String text) {
Property.Deprecated
);

// TODO : Should use enum
public static final Setting<String> INDEX_CONCURRENT_SEGMENT_SEARCH_MODE = Setting.simpleString(
"index.search.concurrent_segment_search.mode",
CONCURRENT_SEGMENT_SEARCH_MODE_NONE,
Expand All @@ -739,6 +742,32 @@ public static IndexMergePolicy fromString(String text) {
Property.IndexScope
);

public static final Setting<String> INDEX_CONCURRENT_INTRA_SEGMENT_SEARCH_MODE = Setting.simpleString(
"index.search.concurrent_intra_segment_search.mode",
CONCURRENT_SEGMENT_SEARCH_MODE_NONE,
value -> {
// TODO : Add support for Auto mode with Intra Segment Search
switch (value) {
case CONCURRENT_SEGMENT_SEARCH_MODE_ALL:
case CONCURRENT_SEGMENT_SEARCH_MODE_NONE:
// valid setting
break;
default:
throw new IllegalArgumentException("Setting value must be one of [all, none]");
}
},
Property.Dynamic,
Property.IndexScope
);

public static final Setting<Integer> INDEX_CONCURRENT_INTRA_SEGMENT_PARTITION_SIZE = Setting.intSetting(
"index.search.concurrent_intra_segment_search.partition_size",
CONCURRENT_INTRA_SEGMENT_DEFAULT_PARTITION_SIZE_VALUE,
CONCURRENT_INTRA_SEGMENT_MINIMUM_PARTITION_SIZE_VALUE,
Property.Dynamic,
Property.IndexScope
);

public static final Setting<Boolean> INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING = Setting.boolSetting(
"index.optimize_doc_id_lookup.fuzzy_set.enabled",
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@

import static org.opensearch.search.SearchService.AGGREGATION_REWRITE_FILTER_SEGMENT_THRESHOLD;
import static org.opensearch.search.SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_INTRA_SEGMENT_SEARCH_MODE;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_MODE_ALL;
Expand Down Expand Up @@ -210,7 +211,9 @@ final class DefaultSearchContext extends SearchContext {
private final FetchPhase fetchPhase;
private final Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder;
private final String concurrentSearchMode;
private final String concurrentIntraSegmentSearchMode;
private final SetOnce<Boolean> requestShouldUseConcurrentSearch = new SetOnce<>();
private final SetOnce<Boolean> requestShouldUseConcurrentIntraSegmentSearch = new SetOnce<>();
private final int maxAggRewriteFilters;
private final int filterRewriteSegmentThreshold;
private final int cardinalityAggregationPruningThreshold;
Expand Down Expand Up @@ -247,6 +250,7 @@ final class DefaultSearchContext extends SearchContext {
this.clusterService = clusterService;
this.engineSearcher = readerContext.acquireSearcher("search");
this.concurrentSearchMode = evaluateConcurrentSearchMode(executor);
this.concurrentIntraSegmentSearchMode = evaluateConcurrentIntraSegmentSearchMode(executor);
this.searcher = new ContextIndexSearcher(
engineSearcher.getIndexReader(),
engineSearcher.getSimilarity(),
Expand Down Expand Up @@ -1018,6 +1022,31 @@ && aggregations().factories() != null
}
}

@Override
public boolean shouldUseIntraSegmentConcurrentSearch() {
assert requestShouldUseConcurrentIntraSegmentSearch.get() != null : "requestShouldUseConcurrentIntraSegmentSearch must be set";
assert concurrentIntraSegmentSearchMode != null : "concurrentIntraSegmentSearchMode must be set";
// TODO : Handle auto mode here
return (concurrentSearchMode.equals(CONCURRENT_SEGMENT_SEARCH_MODE_ALL))
&& Boolean.TRUE.equals(requestShouldUseConcurrentIntraSegmentSearch.get());
}

public void evaluateRequestShouldUseIntraSegmentConcurrentSearch() {
Copy link
Member

@sandeshkr419 sandeshkr419 Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this method be private?

Also, can we refactor it better to avoid multiple if-else:

private void evaluateRequestShouldUseIntraSegmentConcurrentSearch() {
    // Check for any condition that forces concurrency to be disabled.
    final boolean isUnsupported = (sort != null && sort.isSortOnTimeSeriesField())
                               || (aggregations() != null)
                               || (terminateAfter != DEFAULT_TERMINATE_AFTER)
                               || (trackTotalHitsUpTo != TRACK_TOTAL_HITS_DISABLED); // TODO: Need to handle TotalHitCountCollectorManager

    // Check for explicitly enables concurrency.
    final boolean isEnabledByMode = concurrentIntraSegmentSearchMode.equals(CONCURRENT_SEGMENT_SEARCH_MODE_ALL); // TODO: Handle auto mode here

    requestShouldUseConcurrentIntraSegmentSearch.set(isEnabledByMode && !isUnsupported);
}

if (sort != null && sort.isSortOnTimeSeriesField()) {
requestShouldUseConcurrentIntraSegmentSearch.set(false);
} else if (aggregations() != null) {
requestShouldUseConcurrentIntraSegmentSearch.set(false);
} else if (terminateAfter != DEFAULT_TERMINATE_AFTER) {
requestShouldUseConcurrentIntraSegmentSearch.set(false);
} else if (trackTotalHitsUpTo != TRACK_TOTAL_HITS_DISABLED) { // TODO : Need to handle TotalHitCountCollectorManager
requestShouldUseConcurrentIntraSegmentSearch.set(false);
} else if (concurrentIntraSegmentSearchMode.equals(CONCURRENT_SEGMENT_SEARCH_MODE_ALL)) { // TODO : Handle auto mode here
requestShouldUseConcurrentIntraSegmentSearch.set(true);
} else {
requestShouldUseConcurrentIntraSegmentSearch.set(false);
}
}

public void setProfilers(Profilers profilers) {
this.profilers = profilers;
}
Expand Down Expand Up @@ -1115,6 +1144,23 @@ private String evaluateConcurrentSearchMode(Executor concurrentSearchExecutor) {
);
}

private String evaluateConcurrentIntraSegmentSearchMode(Executor concurrentSearchExecutor) {
// Skip concurrent search for system indices, throttled requests, or if dependencies are missing
if (indexShard.isSystem()
|| indexShard.indexSettings().isSearchThrottled()
|| clusterService == null
|| concurrentSearchExecutor == null) {
return CONCURRENT_SEGMENT_SEARCH_MODE_NONE;
}

Settings indexSettings = indexService.getIndexSettings().getSettings();
ClusterSettings clusterSettings = clusterService.getClusterSettings();
return indexSettings.get(
IndexSettings.INDEX_CONCURRENT_INTRA_SEGMENT_SEARCH_MODE.getKey(),
clusterSettings.get(CLUSTER_CONCURRENT_INTRA_SEGMENT_SEARCH_MODE)
);
}

/**
* Returns the target maximum slice count to use for concurrent segment search.
*
Expand Down Expand Up @@ -1142,6 +1188,16 @@ public int getTargetMaxSliceCount() {

}

@Override
public int getSegmentPartitionSize() {
return indexService.getIndexSettings()
.getSettings()
.getAsInt(
IndexSettings.INDEX_CONCURRENT_INTRA_SEGMENT_PARTITION_SIZE.getKey(),
clusterService.getClusterSettings().get(SearchService.CONCURRENT_INTRA_SEGMENT_SEARCH_PARTITION_SIZE_SETTING)
);
}

@Override
public boolean shouldUseTimeSeriesDescSortOptimization() {
return indexShard.isTimeSeriesDescSortOptimizationEnabled()
Expand Down
36 changes: 36 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.Deprecated
);

// TODO : Put these three into an enum
// Allow concurrent segment search for all requests
public static final String CONCURRENT_SEGMENT_SEARCH_MODE_ALL = "all";

Expand All @@ -285,6 +286,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public static final Setting<String> CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE = Setting.simpleString(
"search.concurrent_segment_search.mode",
CONCURRENT_SEGMENT_SEARCH_MODE_AUTO,
// TODO : This should go inside the enum
value -> {
switch (value) {
case CONCURRENT_SEGMENT_SEARCH_MODE_ALL:
Expand Down Expand Up @@ -314,6 +316,38 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.Dynamic,
Property.NodeScope
);

public static final String CONCURRENT_INTRA_SEGMENT_PARTITION_SIZE_KEY = "search.concurrent_intra_segment_search.partition_size";
public static final int CONCURRENT_INTRA_SEGMENT_DEFAULT_PARTITION_SIZE_VALUE = 10_000;
public static final int CONCURRENT_INTRA_SEGMENT_MINIMUM_PARTITION_SIZE_VALUE = 2;

public static final Setting<String> CLUSTER_CONCURRENT_INTRA_SEGMENT_SEARCH_MODE = Setting.simpleString(
"search.concurrent_intra_segment_search.mode",
CONCURRENT_SEGMENT_SEARCH_MODE_NONE,
// TODO : This should go inside the enum
value -> {
switch (value) {
// TODO : Handle auto mode.
case CONCURRENT_SEGMENT_SEARCH_MODE_ALL:
case CONCURRENT_SEGMENT_SEARCH_MODE_NONE:
// valid setting
break;
default:
throw new IllegalArgumentException("Setting value must be one of [all, none]");
}
},
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Integer> CONCURRENT_INTRA_SEGMENT_SEARCH_PARTITION_SIZE_SETTING = Setting.intSetting(
CONCURRENT_INTRA_SEGMENT_PARTITION_SIZE_KEY,
CONCURRENT_INTRA_SEGMENT_DEFAULT_PARTITION_SIZE_VALUE,
CONCURRENT_INTRA_SEGMENT_MINIMUM_PARTITION_SIZE_VALUE,
Property.Dynamic,
Property.NodeScope
);

// value 0 means rewrite filters optimization in aggregations will be disabled
@ExperimentalApi
public static final Setting<Integer> MAX_AGGREGATION_REWRITE_FILTERS = Setting.intSetting(
Expand Down Expand Up @@ -1383,6 +1417,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
// nothing to parse...
if (source == null) {
context.evaluateRequestShouldUseConcurrentSearch();
context.evaluateRequestShouldUseIntraSegmentConcurrentSearch();
return;
}

Expand Down Expand Up @@ -1563,6 +1598,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
context.collapse(collapseContext);
}
context.evaluateRequestShouldUseConcurrentSearch();
context.evaluateRequestShouldUseIntraSegmentConcurrentSearch();
if (source.profile()) {
final Function<Query, Collection<Supplier<ProfileMetric>>> pluginProfileMetricsSupplier = (query) -> pluginProfilers.stream()
.flatMap(p -> p.getQueryProfileMetrics(context, query).stream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ public CollectionStatistics collectionStatistics(String field) throws IOExceptio
*/
@Override
protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
return slicesInternal(leaves, searchContext.getTargetMaxSliceCount());
return slicesInternal(leaves, new MaxTargetSliceSupplier.SliceInputConfig(searchContext));
}

public DirectoryReader getDirectoryReader() {
Expand Down Expand Up @@ -607,17 +607,53 @@ private boolean canMatchSearchAfter(LeafReaderContext ctx) throws IOException {
}

// package-private for testing
LeafSlice[] slicesInternal(List<LeafReaderContext> leaves, int targetMaxSlice) {
LeafSlice[] slicesInternal(List<LeafReaderContext> leaves, MaxTargetSliceSupplier.SliceInputConfig sliceInputConfig) {
LeafSlice[] leafSlices;
if (targetMaxSlice == 0) {
if (sliceInputConfig.targetMaxSliceCount == 0) {
// use the default lucene slice calculation
leafSlices = super.slices(leaves);
logger.debug("Slice count using lucene default [{}]", leafSlices.length);
} else {
// use the custom slice calculation based on targetMaxSlice
leafSlices = MaxTargetSliceSupplier.getSlices(leaves, targetMaxSlice);
leafSlices = MaxTargetSliceSupplier.getSlices(leaves, sliceInputConfig);
logger.debug("Slice count using max target slice supplier [{}]", leafSlices.length);
}
// FIXME: Remove before merging
printDistributionLogs(leaves, leafSlices);
return leafSlices;
}

// FIXME: Remove before merging
private static void printDistributionLogs(List<LeafReaderContext> leaves, LeafSlice[] leafSlices) {
StringBuilder res = new StringBuilder();
long total = 0;
for (LeafReaderContext leaf : leaves) {
res.append(" Leaf [");
res.append(leaf.ord);
res.append(", ");
res.append(leaf.reader().maxDoc());
res.append(']');
total += leaf.reader().maxDoc();
}
res.append(" Total Docs = ").append(total).append(" ");
logger.info("Input leaves {}", res.toString());
res.setLength(0);
for (LeafSlice slice : leafSlices) {
res.append(" LeafSlice[ ");
res.append(" numParts = ");
res.append(slice.partitions.length);
res.append(" ");
total = 0;
for (LeafReaderContextPartition partition : slice.partitions) {
res.append("Part [ docs = ");
res.append(partition.maxDocId - partition.minDocId);
total += partition.maxDocId - partition.minDocId;
res.append("]");
}
res.append(", Total Docs = ").append(total).append(" ");
res.append(" ]");
}
logger.info("Output leaf slices {}", res.toString());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,11 @@ public int getTargetMaxSliceCount() {
return in.getTargetMaxSliceCount();
}

@Override
public int getSegmentPartitionSize() {
return in.getSegmentPartitionSize();
}

@Override
public boolean shouldUseTimeSeriesDescSortOptimization() {
return in.shouldUseTimeSeriesDescSortOptimization();
Expand Down
Loading
Loading