Skip to content

Commit 76f81c6

Browse files
committed
Initial working commit for enabling Intra Segment Search without UTs/ITs
Signed-off-by: expani <anijainc@amazon.com>
1 parent 5fd0a36 commit 76f81c6

File tree

12 files changed

+332
-25
lines changed

12 files changed

+332
-25
lines changed

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -778,6 +778,8 @@ public void apply(Settings value, Settings current, Settings previous) {
778778
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING, // deprecated
779779
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING,
780780
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE,
781+
SearchService.CLUSTER_CONCURRENT_INTRA_SEGMENT_SEARCH_MODE,
782+
SearchService.CONCURRENT_INTRA_SEGMENT_SEARCH_PARTITION_SIZE_SETTING,
781783

782784
RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
783785
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
250250
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING, // deprecated
251251
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_MODE,
252252
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_MAX_SLICE_COUNT,
253+
IndexSettings.INDEX_CONCURRENT_INTRA_SEGMENT_SEARCH_MODE,
254+
IndexSettings.INDEX_CONCURRENT_INTRA_SEGMENT_PARTITION_SIZE,
253255
IndexSettings.ALLOW_DERIVED_FIELDS,
254256

255257
// Settings for star tree index

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@
7575
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING;
7676
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING;
7777
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING;
78+
import static org.opensearch.search.SearchService.CONCURRENT_INTRA_SEGMENT_DEFAULT_PARTITION_SIZE_VALUE;
79+
import static org.opensearch.search.SearchService.CONCURRENT_INTRA_SEGMENT_MINIMUM_PARTITION_SIZE_VALUE;
7880
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_DEFAULT_SLICE_COUNT_VALUE;
7981
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_MIN_SLICE_COUNT_VALUE;
8082
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_MODE_ALL;
@@ -713,6 +715,7 @@ public static IndexMergePolicy fromString(String text) {
713715
Property.Deprecated
714716
);
715717

718+
// TODO : Should use enum
716719
public static final Setting<String> INDEX_CONCURRENT_SEGMENT_SEARCH_MODE = Setting.simpleString(
717720
"index.search.concurrent_segment_search.mode",
718721
CONCURRENT_SEGMENT_SEARCH_MODE_NONE,
@@ -739,6 +742,32 @@ public static IndexMergePolicy fromString(String text) {
739742
Property.IndexScope
740743
);
741744

745+
public static final Setting<String> INDEX_CONCURRENT_INTRA_SEGMENT_SEARCH_MODE = Setting.simpleString(
746+
"index.search.concurrent_intra_segment_search.mode",
747+
CONCURRENT_SEGMENT_SEARCH_MODE_NONE,
748+
value -> {
749+
// TODO : Add support for Auto mode with Intra Segment Search
750+
switch (value) {
751+
case CONCURRENT_SEGMENT_SEARCH_MODE_ALL:
752+
case CONCURRENT_SEGMENT_SEARCH_MODE_NONE:
753+
// valid setting
754+
break;
755+
default:
756+
throw new IllegalArgumentException("Setting value must be one of [all, none]");
757+
}
758+
},
759+
Property.Dynamic,
760+
Property.IndexScope
761+
);
762+
763+
public static final Setting<Integer> INDEX_CONCURRENT_INTRA_SEGMENT_PARTITION_SIZE = Setting.intSetting(
764+
"index.search.concurrent_intra_segment_search.partition_size",
765+
CONCURRENT_INTRA_SEGMENT_DEFAULT_PARTITION_SIZE_VALUE,
766+
CONCURRENT_INTRA_SEGMENT_MINIMUM_PARTITION_SIZE_VALUE,
767+
Property.Dynamic,
768+
Property.IndexScope
769+
);
770+
742771
public static final Setting<Boolean> INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING = Setting.boolSetting(
743772
"index.optimize_doc_id_lookup.fuzzy_set.enabled",
744773
false,

server/src/main/java/org/opensearch/search/DefaultSearchContext.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@
121121

122122
import static org.opensearch.search.SearchService.AGGREGATION_REWRITE_FILTER_SEGMENT_THRESHOLD;
123123
import static org.opensearch.search.SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD;
124+
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_INTRA_SEGMENT_SEARCH_MODE;
124125
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE;
125126
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
126127
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_MODE_ALL;
@@ -210,7 +211,9 @@ final class DefaultSearchContext extends SearchContext {
210211
private final FetchPhase fetchPhase;
211212
private final Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder;
212213
private final String concurrentSearchMode;
214+
private final String concurrentIntraSegmentSearchMode;
213215
private final SetOnce<Boolean> requestShouldUseConcurrentSearch = new SetOnce<>();
216+
private final SetOnce<Boolean> requestShouldUseConcurrentIntraSegmentSearch = new SetOnce<>();
214217
private final int maxAggRewriteFilters;
215218
private final int filterRewriteSegmentThreshold;
216219
private final int cardinalityAggregationPruningThreshold;
@@ -247,6 +250,7 @@ final class DefaultSearchContext extends SearchContext {
247250
this.clusterService = clusterService;
248251
this.engineSearcher = readerContext.acquireSearcher("search");
249252
this.concurrentSearchMode = evaluateConcurrentSearchMode(executor);
253+
this.concurrentIntraSegmentSearchMode = evaluateConcurrentIntraSegmentSearchMode(executor);
250254
this.searcher = new ContextIndexSearcher(
251255
engineSearcher.getIndexReader(),
252256
engineSearcher.getSimilarity(),
@@ -1018,6 +1022,29 @@ && aggregations().factories() != null
10181022
}
10191023
}
10201024

1025+
@Override
1026+
public boolean shouldUseIntraSegmentConcurrentSearch() {
1027+
assert requestShouldUseConcurrentIntraSegmentSearch.get() != null : "requestShouldUseConcurrentIntraSegmentSearch must be set";
1028+
assert concurrentIntraSegmentSearchMode != null : "concurrentIntraSegmentSearchMode must be set";
1029+
// TODO : Handle auto mode here
1030+
return (concurrentSearchMode.equals(CONCURRENT_SEGMENT_SEARCH_MODE_ALL))
1031+
&& Boolean.TRUE.equals(requestShouldUseConcurrentIntraSegmentSearch.get());
1032+
}
1033+
1034+
public void evaluateRequestShouldUseIntraSegmentConcurrentSearch() {
1035+
if (sort != null && sort.isSortOnTimeSeriesField()) {
1036+
requestShouldUseConcurrentIntraSegmentSearch.set(false);
1037+
} else if (aggregations() != null) {
1038+
requestShouldUseConcurrentIntraSegmentSearch.set(false);
1039+
} else if (terminateAfter != DEFAULT_TERMINATE_AFTER) {
1040+
requestShouldUseConcurrentIntraSegmentSearch.set(false);
1041+
} else if (concurrentIntraSegmentSearchMode.equals(CONCURRENT_SEGMENT_SEARCH_MODE_ALL)){ // TODO : Handle auto mode here
1042+
requestShouldUseConcurrentIntraSegmentSearch.set(true);
1043+
} else {
1044+
requestShouldUseConcurrentIntraSegmentSearch.set(false);
1045+
}
1046+
}
1047+
10211048
public void setProfilers(Profilers profilers) {
10221049
this.profilers = profilers;
10231050
}
@@ -1115,6 +1142,23 @@ private String evaluateConcurrentSearchMode(Executor concurrentSearchExecutor) {
11151142
);
11161143
}
11171144

1145+
private String evaluateConcurrentIntraSegmentSearchMode(Executor concurrentSearchExecutor) {
1146+
// Skip concurrent search for system indices, throttled requests, or if dependencies are missing
1147+
if (indexShard.isSystem()
1148+
|| indexShard.indexSettings().isSearchThrottled()
1149+
|| clusterService == null
1150+
|| concurrentSearchExecutor == null) {
1151+
return CONCURRENT_SEGMENT_SEARCH_MODE_NONE;
1152+
}
1153+
1154+
Settings indexSettings = indexService.getIndexSettings().getSettings();
1155+
ClusterSettings clusterSettings = clusterService.getClusterSettings();
1156+
return indexSettings.get(
1157+
IndexSettings.INDEX_CONCURRENT_INTRA_SEGMENT_SEARCH_MODE.getKey(),
1158+
clusterSettings.get(CLUSTER_CONCURRENT_INTRA_SEGMENT_SEARCH_MODE)
1159+
);
1160+
}
1161+
11181162
/**
11191163
* Returns the target maximum slice count to use for concurrent segment search.
11201164
*
@@ -1142,6 +1186,16 @@ public int getTargetMaxSliceCount() {
11421186

11431187
}
11441188

1189+
@Override
1190+
public int getSegmentPartitionSize() {
1191+
return indexService.getIndexSettings()
1192+
.getSettings()
1193+
.getAsInt(
1194+
IndexSettings.INDEX_CONCURRENT_INTRA_SEGMENT_PARTITION_SIZE.getKey(),
1195+
clusterService.getClusterSettings().get(SearchService.CONCURRENT_INTRA_SEGMENT_SEARCH_PARTITION_SIZE_SETTING)
1196+
);
1197+
}
1198+
11451199
@Override
11461200
public boolean shouldUseTimeSeriesDescSortOptimization() {
11471201
return indexShard.isTimeSeriesDescSortOptimizationEnabled()

server/src/main/java/org/opensearch/search/SearchService.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
273273
Property.Deprecated
274274
);
275275

276+
// TODO : Put these three into an enum
276277
// Allow concurrent segment search for all requests
277278
public static final String CONCURRENT_SEGMENT_SEARCH_MODE_ALL = "all";
278279

@@ -285,6 +286,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
285286
public static final Setting<String> CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE = Setting.simpleString(
286287
"search.concurrent_segment_search.mode",
287288
CONCURRENT_SEGMENT_SEARCH_MODE_AUTO,
289+
// TODO : This should go inside the enum
288290
value -> {
289291
switch (value) {
290292
case CONCURRENT_SEGMENT_SEARCH_MODE_ALL:
@@ -314,6 +316,39 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
314316
Property.Dynamic,
315317
Property.NodeScope
316318
);
319+
320+
public static final String CONCURRENT_INTRA_SEGMENT_PARTITION_SIZE_KEY = "search.concurrent_intra_segment_search.partition_size";
321+
public static final int CONCURRENT_INTRA_SEGMENT_DEFAULT_PARTITION_SIZE_VALUE = 10_000;
322+
public static final int CONCURRENT_INTRA_SEGMENT_MINIMUM_PARTITION_SIZE_VALUE = 2;
323+
324+
public static final Setting<String> CLUSTER_CONCURRENT_INTRA_SEGMENT_SEARCH_MODE = Setting.simpleString(
325+
"search.concurrent_intra_segment_search.mode",
326+
CONCURRENT_SEGMENT_SEARCH_MODE_NONE,
327+
// TODO : This should go inside the enum
328+
value -> {
329+
switch (value) {
330+
// TODO : Handle auto mode.
331+
case CONCURRENT_SEGMENT_SEARCH_MODE_ALL:
332+
case CONCURRENT_SEGMENT_SEARCH_MODE_NONE:
333+
// valid setting
334+
break;
335+
default:
336+
throw new IllegalArgumentException("Setting value must be one of [all, none]");
337+
}
338+
},
339+
Property.Dynamic,
340+
Property.NodeScope
341+
);
342+
343+
public static final Setting<Integer> CONCURRENT_INTRA_SEGMENT_SEARCH_PARTITION_SIZE_SETTING = Setting.intSetting(
344+
CONCURRENT_INTRA_SEGMENT_PARTITION_SIZE_KEY,
345+
CONCURRENT_INTRA_SEGMENT_DEFAULT_PARTITION_SIZE_VALUE,
346+
CONCURRENT_INTRA_SEGMENT_MINIMUM_PARTITION_SIZE_VALUE,
347+
Property.Dynamic,
348+
Property.NodeScope
349+
);
350+
351+
317352
// value 0 means rewrite filters optimization in aggregations will be disabled
318353
@ExperimentalApi
319354
public static final Setting<Integer> MAX_AGGREGATION_REWRITE_FILTERS = Setting.intSetting(
@@ -1383,6 +1418,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
13831418
// nothing to parse...
13841419
if (source == null) {
13851420
context.evaluateRequestShouldUseConcurrentSearch();
1421+
context.evaluateRequestShouldUseIntraSegmentConcurrentSearch();
13861422
return;
13871423
}
13881424

@@ -1563,6 +1599,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
15631599
context.collapse(collapseContext);
15641600
}
15651601
context.evaluateRequestShouldUseConcurrentSearch();
1602+
context.evaluateRequestShouldUseIntraSegmentConcurrentSearch();
15661603
if (source.profile()) {
15671604
final Function<Query, Collection<Supplier<ProfileMetric>>> pluginProfileMetricsSupplier = (query) -> pluginProfilers.stream()
15681605
.flatMap(p -> p.getQueryProfileMetrics(context, query).stream())

server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,7 @@ public CollectionStatistics collectionStatistics(String field) throws IOExceptio
537537
*/
538538
@Override
539539
protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
540-
return slicesInternal(leaves, searchContext.getTargetMaxSliceCount());
540+
return slicesInternal(leaves, new MaxTargetSliceSupplier.SliceInputConfig(searchContext));
541541
}
542542

543543
public DirectoryReader getDirectoryReader() {
@@ -607,17 +607,52 @@ private boolean canMatchSearchAfter(LeafReaderContext ctx) throws IOException {
607607
}
608608

609609
// package-private for testing
610-
LeafSlice[] slicesInternal(List<LeafReaderContext> leaves, int targetMaxSlice) {
610+
LeafSlice[] slicesInternal(List<LeafReaderContext> leaves, MaxTargetSliceSupplier.SliceInputConfig sliceInputConfig) {
611611
LeafSlice[] leafSlices;
612-
if (targetMaxSlice == 0) {
612+
if (sliceInputConfig.targetMaxSliceCount == 0) {
613613
// use the default lucene slice calculation
614614
leafSlices = super.slices(leaves);
615615
logger.debug("Slice count using lucene default [{}]", leafSlices.length);
616616
} else {
617617
// use the custom slice calculation based on targetMaxSlice
618-
leafSlices = MaxTargetSliceSupplier.getSlices(leaves, targetMaxSlice);
618+
leafSlices = MaxTargetSliceSupplier.getSlices(leaves, sliceInputConfig);
619619
logger.debug("Slice count using max target slice supplier [{}]", leafSlices.length);
620620
}
621+
// FIXME: Remove before merging
622+
printDistributionLogs(leaves, leafSlices);
621623
return leafSlices;
622624
}
625+
626+
private static void printDistributionLogs(List<LeafReaderContext> leaves, LeafSlice[] leafSlices) {
627+
StringBuilder res = new StringBuilder();
628+
long total = 0;
629+
for (LeafReaderContext leaf : leaves) {
630+
res.append(" Leaf [");
631+
res.append(leaf.ord);
632+
res.append(", ");
633+
res.append(leaf.reader().maxDoc());
634+
res.append(']');
635+
total += leaf.reader().maxDoc();
636+
}
637+
res.append(" Total Docs = ").append(total).append(" ");
638+
logger.info("Input leaves {}", res.toString());
639+
res.setLength(0);
640+
for (LeafSlice slice : leafSlices) {
641+
res.append(" LeafSlice[ ");
642+
res.append(" numParts = ");
643+
res.append(slice.partitions.length);
644+
res.append(" ");
645+
total = 0;
646+
for (LeafReaderContextPartition partition : slice.partitions) {
647+
res.append("Part [ docs = ");
648+
res.append(partition.maxDocId - partition.minDocId);
649+
total += partition.maxDocId - partition.minDocId;
650+
res.append("]");
651+
}
652+
res.append(", Total Docs = ").append(total).append(" ");
653+
res.append(" ]");
654+
}
655+
logger.info("Output leaf slices {}", res.toString());
656+
}
657+
623658
}

server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,11 @@ public int getTargetMaxSliceCount() {
578578
return in.getTargetMaxSliceCount();
579579
}
580580

581+
@Override
582+
public int getSegmentPartitionSize() {
583+
return in.getSegmentPartitionSize();
584+
}
585+
581586
@Override
582587
public boolean shouldUseTimeSeriesDescSortOptimization() {
583588
return in.shouldUseTimeSeriesDescSortOptimization();

0 commit comments

Comments
 (0)