Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,10 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING, // deprecated
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING,
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE,
// Intra-segment search settings
SearchService.INTRA_SEGMENT_SEARCH_ENABLED,
SearchService.INTRA_SEGMENT_SEARCH_MIN_SEGMENT_SIZE,
SearchService.INTRA_SEGMENT_SEARCH_PARTITIONS_PER_SEGMENT,

RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_MODE_ALL;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_MODE_AUTO;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_MODE_NONE;
import static org.opensearch.search.SearchService.INTRA_SEGMENT_SEARCH_ENABLED;
import static org.opensearch.search.SearchService.INTRA_SEGMENT_SEARCH_MIN_SEGMENT_SIZE;
import static org.opensearch.search.SearchService.INTRA_SEGMENT_SEARCH_PARTITIONS_PER_SEGMENT;
import static org.opensearch.search.SearchService.KEYWORD_INDEX_OR_DOC_VALUES_ENABLED;
import static org.opensearch.search.SearchService.MAX_AGGREGATION_REWRITE_FILTERS;
import static org.opensearch.search.streaming.FlushModeResolver.STREAMING_MAX_ESTIMATED_BUCKET_COUNT;
Expand Down Expand Up @@ -1311,4 +1314,32 @@ public double getStreamingMinCardinalityRatio() {
public long getStreamingMinEstimatedBucketCount() {
return clusterService.getClusterSettings().get(STREAMING_MIN_ESTIMATED_BUCKET_COUNT);
}

/**
* Returns whether intra-segment search is enabled for this search context.
* This checks cluster-level settings to determine if segments should be partitioned for concurrent search.
*/
@Override
public boolean getIntraSegmentSearchEnabled() {
return clusterService.getClusterSettings().get(INTRA_SEGMENT_SEARCH_ENABLED);
}

/**
* Returns the number of partitions to create per segment when intra-segment search is enabled.
* This value is retrieved from cluster-level settings.
*/
@Override
public int getIntraSegmentPartitionsPerSegment() {
return clusterService.getClusterSettings().get(INTRA_SEGMENT_SEARCH_PARTITIONS_PER_SEGMENT);
}

/**
* Returns the minimum segment size required to enable intra-segment partitioning.
* Segments smaller than this threshold will not be partitioned.
* This value is retrieved from cluster-level settings.
*/
@Override
public int getIntraSegmentMinSegmentSize() {
return clusterService.getClusterSettings().get(INTRA_SEGMENT_SEARCH_MIN_SEGMENT_SIZE);
}
}
31 changes: 31 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,34 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.Dynamic,
Property.NodeScope
);

// Intra-segment search settings
public static final Setting<Boolean> INTRA_SEGMENT_SEARCH_ENABLED = Setting.boolSetting(
"search.intra_segment_search.enabled",
true,
Property.Dynamic,
Property.NodeScope
);

// Setting to control minimum segment size for intra-segment partitioning
public static final Setting<Integer> INTRA_SEGMENT_SEARCH_MIN_SEGMENT_SIZE = Setting.intSetting(
"search.intra_segment_search.min_segment_size",
// 10000, // Only partition segments with 10k+ docs
1000000,
1000,
Property.Dynamic,
Property.NodeScope
);

// Setting to control number of partitions per segment
public static final Setting<Integer> INTRA_SEGMENT_SEARCH_PARTITIONS_PER_SEGMENT = Setting.intSetting(
"search.intra_segment_search.partitions_per_segment",
2, // Default 2 partitions per segment
1,
Property.Dynamic,
Property.NodeScope
);

// value 0 means rewrite filters optimization in aggregations will be disabled
@ExperimentalApi
public static final Setting<Integer> MAX_AGGREGATION_REWRITE_FILTERS = Setting.intSetting(
Expand Down Expand Up @@ -2049,6 +2077,9 @@ public MinAndMax<?> estimatedMinAndMax() {
* @return the computed default slice count
*/
private static int computeDefaultSliceCount() {
// int sliceCount = Math.max(1, Math.min(Runtime.getRuntime().availableProcessors() / 2, 4));
// logger.info("The computeDefaultSliceCount is {}", sliceCount);
// return sliceCount;
return Math.max(1, Math.min(Runtime.getRuntime().availableProcessors() / 2, 4));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,18 +301,26 @@ public void search(

@Override
protected void search(LeafReaderContextPartition[] partitions, Weight weight, Collector collector) throws IOException {
// logger.info("ContextIndexSearcher.search(LeafReaderContextPartition[]) called with {} partitions", partitions.length);
searchContext.indexShard().getSearchOperationListener().onPreSliceExecution(searchContext);
try {
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
// reader order here.
if (searchContext.shouldUseTimeSeriesDescSortOptimization()) {
// logger.info("Using time series desc optimization - searching {} partitions in reverse order", partitions.length);
for (int i = partitions.length - 1; i >= 0; i--) {
/*logger.info("Searching partition {}: segment {} docs [{}-{}]",
(partitions.length - 1 - i), partitions[i].ctx.ord, partitions[i].minDocId, partitions[i].maxDocId);*/
searchLeaf(partitions[i].ctx, partitions[i].minDocId, partitions[i].maxDocId, weight, collector);
}
} else {
// logger.info("Using normal order - searching {} partitions", partitions.length);
int partitionIndex = 0;
for (LeafReaderContextPartition partition : partitions) {
/*logger.info("Searching partition {}: segment {} docs [{}-{}]",
partitionIndex++, partition.ctx.ord, partition.minDocId, partition.maxDocId);*/
searchLeaf(partition.ctx, partition.minDocId, partition.maxDocId, weight, collector);
}
}
Expand Down Expand Up @@ -570,14 +578,68 @@ public CollectionStatistics collectionStatistics(String field) throws IOExceptio
return collectionStatistics;
}

/**
* Determines whether intra-segment search should be used for the current search context
*/
private boolean shouldUseIntraSegmentSearch() {
// Check if intra-segment search is enabled globally and for this search context
// Note: Intra-segment search requires concurrent execution since partitions from the same
// segment must be processed by different threads (cannot be in the same slice)
return getIntraSegmentSearchEnabled() && searchContext.shouldUseConcurrentSearch() && !hasComplexAggregations();
}

/**
* Check if the search has complex aggregations that might not benefit from intra-segment search
*/
private boolean hasComplexAggregations() {
// For now, be conservative and disable intra-segment search when aggregations are present
// This can be enhanced later to support specific types of aggregations
return searchContext.aggregations() != null && searchContext.aggregations().factories() != null;

/*if (searchContext.aggregations() == null || searchContext.aggregations().factories() == null) {
logger.info("No aggregations present in search request");
return false;
}
logger.info("aggregations present in search request");
return true;*/
}

/**
* Get intra-segment search enabled setting from search context
*/
private boolean getIntraSegmentSearchEnabled() {
return searchContext.getIntraSegmentSearchEnabled();
}

/**
* Get number of partitions per segment from search context
*/
private int getIntraSegmentPartitionsPerSegment() {
return searchContext.getIntraSegmentPartitionsPerSegment();
}

/**
* Get minimum segment size for intra-segment partitioning from search context
*/
private int getIntraSegmentMinSegmentSize() {
return searchContext.getIntraSegmentMinSegmentSize();
}

/**
* Compute the leaf slices that will be used by concurrent segment search to spread work across threads
* @param leaves all the segments
* @return leafSlice group to be executed by different threads
*/
@Override
protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
return slicesInternal(leaves, searchContext.getTargetMaxSliceCount());
if (leaves == null || leaves.isEmpty()) {
return new LeafSlice[0];
}
int targetMaxSlice = searchContext.getTargetMaxSliceCount();
if (!shouldUseIntraSegmentSearch()) {
return MaxTargetSliceSupplier.getSlices(leaves, targetMaxSlice);
}
return MaxTargetSliceSupplier.getSlicesWithAutoPartitioning(leaves, targetMaxSlice);
}

public DirectoryReader getDirectoryReader() {
Expand Down Expand Up @@ -647,6 +709,7 @@ private boolean canMatchSearchAfter(LeafReaderContext ctx) throws IOException {
}

// package-private for testing
// Disable the usage of slicesInternal for now (temp).
LeafSlice[] slicesInternal(List<LeafReaderContext> leaves, int targetMaxSlice) {
LeafSlice[] leafSlices;
if (targetMaxSlice == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,4 +587,19 @@ public boolean shouldUseTimeSeriesDescSortOptimization() {
public boolean getStarTreeIndexEnabled() {
return in.getStarTreeIndexEnabled();
}

@Override
public boolean getIntraSegmentSearchEnabled() {
return in.getIntraSegmentSearchEnabled();
}

@Override
public int getIntraSegmentPartitionsPerSegment() {
return in.getIntraSegmentPartitionsPerSegment();
}

@Override
public int getIntraSegmentMinSegmentSize() {
return in.getIntraSegmentMinSegmentSize();
}
}
Loading
Loading