Skip to content

Commit

Permalink
Add terminate_after threshold for concurrent vs non-concurrent segmen…
Browse files Browse the repository at this point in the history
…t search

Signed-off-by: Jay Deng <jayd0104@gmail.com>
  • Loading branch information
jed326 authored and Jay Deng committed Sep 27, 2023
1 parent e617da3 commit 1dd3eed
Show file tree
Hide file tree
Showing 15 changed files with 713 additions and 636 deletions.

This file was deleted.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,8 @@ public void apply(Settings value, Settings current, Settings previous) {
List.of(FeatureFlags.CONCURRENT_SEGMENT_SEARCH),
List.of(
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING,
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING,
SearchService.TERMINATE_AFTER_CONCURRENT_SEARCH_THRESHOLD
),
List.of(FeatureFlags.TELEMETRY),
List.of(TelemetrySettings.TRACER_ENABLED_SETTING, TelemetrySettings.TRACER_SAMPLER_PROBABILITY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ final class DefaultSearchContext extends SearchContext {
private final Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder;
private final boolean concurrentSearchSettingsEnabled;
private final SetOnce<Boolean> requestShouldUseConcurrentSearch = new SetOnce<>();
private final int terminateAfterThreshold;

DefaultSearchContext(
ReaderContext readerContext,
Expand All @@ -200,7 +201,8 @@ final class DefaultSearchContext extends SearchContext {
Version minNodeVersion,
boolean validate,
Executor executor,
Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder
Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder,
int terminateAfterThreshold
) throws IOException {
this.readerContext = readerContext;
this.request = request;
Expand Down Expand Up @@ -239,6 +241,7 @@ final class DefaultSearchContext extends SearchContext {
queryBoost = request.indexBoost();
this.lowLevelCancellation = lowLevelCancellation;
this.requestToAggReduceContextBuilder = requestToAggReduceContextBuilder;
this.terminateAfterThreshold = terminateAfterThreshold;
}

@Override
Expand Down Expand Up @@ -896,6 +899,8 @@ public void evaluateRequestShouldUseConcurrentSearch() {
&& aggregations().factories() != null
&& !aggregations().factories().allFactoriesSupportConcurrentSearch()) {
requestShouldUseConcurrentSearch.set(false);
} else if (terminateAfter < terminateAfterThreshold) {
requestShouldUseConcurrentSearch.set(false);
} else {
requestShouldUseConcurrentSearch.set(true);
}
Expand Down
23 changes: 22 additions & 1 deletion server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,17 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.Dynamic
);

/**
* Threshold for terminate_after search parameter below which non-concurrent search will always be used.
*/
public static final Setting<Integer> TERMINATE_AFTER_CONCURRENT_SEARCH_THRESHOLD = Setting.intSetting(
"search.terminate_after_concurrent_search_threshold",
100000,
0,
Property.NodeScope,
Property.Dynamic
);

/**
* Enables low-level, frequent search cancellation checks. Enabling low-level checks will make long running searches to react
* to the cancellation request faster. It will produce more cancellation checks but benchmarking has shown these did not
Expand Down Expand Up @@ -318,6 +329,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
private final AtomicInteger openPitContexts = new AtomicInteger();
private final String sessionId = UUIDs.randomBase64UUID();
private final Executor indexSearcherExecutor;
private volatile int terminateAfterThreshold;

public SearchService(
ClusterService clusterService,
Expand Down Expand Up @@ -377,6 +389,10 @@ public SearchService(

lowLevelCancellation = LOW_LEVEL_CANCELLATION_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation);

terminateAfterThreshold = TERMINATE_AFTER_CONCURRENT_SEARCH_THRESHOLD.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TERMINATE_AFTER_CONCURRENT_SEARCH_THRESHOLD, this::setTerminateAfterConcurrentSearchThreshold);
}

private void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
Expand Down Expand Up @@ -453,6 +469,10 @@ private void setLowLevelCancellation(Boolean lowLevelCancellation) {
this.lowLevelCancellation = lowLevelCancellation;
}

private void setTerminateAfterConcurrentSearchThreshold(int terminateAfterConcurrentSearchThreshold) {
this.terminateAfterThreshold = terminateAfterConcurrentSearchThreshold;
}

@Override
public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRemovalReason reason) {
// once an index is removed due to deletion or closing, we can just clean up all the pending search context information
Expand Down Expand Up @@ -1060,7 +1080,8 @@ private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSear
clusterService.state().nodes().getMinNodeVersion(),
validate,
indexSearcherExecutor,
this::aggReduceContextBuilder
this::aggReduceContextBuilder,
terminateAfterThreshold
);
// we clone the query shard context here just for rewriting otherwise we
// might end up with incorrect state since we are using now() or script services
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.opensearch.search.profile.query.ProfileWeight;
import org.opensearch.search.profile.query.QueryProfiler;
import org.opensearch.search.profile.query.QueryTimingType;
import org.opensearch.search.query.EarlyTerminatingCollector;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.search.sort.FieldSortBuilder;
Expand Down Expand Up @@ -292,7 +293,7 @@ protected void search(List<LeafReaderContext> leaves, Weight weight, Collector c
private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {

// Check if at all we need to call this leaf for collecting results.
if (canMatch(ctx) == false) {
if (canMatch(ctx) == false || searchContext.isTerminatedEarly()) {
return;
}

Expand All @@ -310,6 +311,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
// there is no doc of interest in this reader context
// continue with the following leaf
return;
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searchContext.setTerminatedEarly(true);
return;
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
Expand All @@ -325,6 +329,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searchContext.setTerminatedEarly(true);
return;
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
Expand All @@ -344,6 +351,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searchContext.setTerminatedEarly(true);
return;
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public List<Aggregator> toAggregators(Collection<Collector> collectors) {
private InnerHitsContext innerHitsContext;

private volatile boolean searchTimedOut;
private volatile boolean terminatedEarly;

protected SearchContext() {}

Expand All @@ -136,6 +137,14 @@ public void setSearchTimedOut(boolean searchTimedOut) {
this.searchTimedOut = searchTimedOut;
}

public boolean isTerminatedEarly() {
return this.terminatedEarly;
}

public void setTerminatedEarly(boolean terminatedEarly) {
this.terminatedEarly = terminatedEarly;
}

@Override
public final void close() {
if (closed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ private static boolean searchWithCollectorManager(
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) {
queryResult.terminatedEarly(false);
}
if (searchContext.isTerminatedEarly()) {
queryResult.terminatedEarly(true);
}

return topDocsFactory.shouldRescore();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,27 @@
import org.apache.lucene.search.LeafCollector;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

/**
* A {@link Collector} that early terminates collection after <code>maxCountHits</code> docs have been collected.
*
* @opensearch.internal
*/
public class EarlyTerminatingCollector extends FilterCollector {
static final class EarlyTerminationException extends RuntimeException {

/**
* Force termination exception
*/
public static final class EarlyTerminationException extends RuntimeException {
EarlyTerminationException(String msg) {
super(msg);
}
}

private final int maxCountHits;
private int numCollected;
private boolean forceTermination;
private final AtomicLong numCollected;
private final boolean forceTermination;
private boolean earlyTerminated;

/**
Expand All @@ -69,11 +74,19 @@ static final class EarlyTerminationException extends RuntimeException {
super(delegate);
this.maxCountHits = maxCountHits;
this.forceTermination = forceTermination;
this.numCollected = new AtomicLong();
}

EarlyTerminatingCollector(final Collector delegate, int maxCountHits, boolean forceTermination, AtomicLong numCollected) {
super(delegate);
this.maxCountHits = maxCountHits;
this.forceTermination = forceTermination;
this.numCollected = numCollected;
}

@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
if (numCollected >= maxCountHits) {
if (numCollected.get() >= maxCountHits) {
earlyTerminated = true;
if (forceTermination) {
throw new EarlyTerminationException("early termination [CountBased]");
Expand All @@ -84,7 +97,7 @@ public LeafCollector getLeafCollector(LeafReaderContext context) throws IOExcept
return new FilterLeafCollector(super.getLeafCollector(context)) {
@Override
public void collect(int doc) throws IOException {
if (++numCollected > maxCountHits) {
if (numCollected.incrementAndGet() > maxCountHits) {
earlyTerminated = true;
if (forceTermination) {
throw new EarlyTerminationException("early termination [CountBased]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
* Manager for the EarlyTerminatingCollector
Expand All @@ -29,16 +30,23 @@ public class EarlyTerminatingCollectorManager<C extends Collector>
private final CollectorManager<C, ReduceableSearchResult> manager;
private final int maxCountHits;
private boolean forceTermination;
private final AtomicLong numCollected;

EarlyTerminatingCollectorManager(CollectorManager<C, ReduceableSearchResult> manager, int maxCountHits, boolean forceTermination) {
this.manager = manager;
this.maxCountHits = maxCountHits;
this.forceTermination = forceTermination;
this.numCollected = new AtomicLong();
}

@Override
public EarlyTerminatingCollector newCollector() throws IOException {
return new EarlyTerminatingCollector(manager.newCollector(), maxCountHits, false /* forced termination is not supported */);
return new EarlyTerminatingCollector(
manager.newCollector(),
maxCountHits,
forceTermination /* forced termination is not supported */,
numCollected
);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,8 @@ private static boolean searchWithCollector(
queryCollector = QueryCollectorContext.createQueryCollector(collectors);
}
QuerySearchResult queryResult = searchContext.queryResult();
try {
searcher.search(query, queryCollector);
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searcher.search(query, queryCollector);
if (searchContext.isTerminatedEarly()) {
queryResult.terminatedEarly(true);
}
if (searchContext.isSearchTimedOut()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
null
null,
100000
);
contextWithoutScroll.from(300);
contextWithoutScroll.close();
Expand Down Expand Up @@ -263,7 +264,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
null
null,
100000
);
context1.from(300);
exception = expectThrows(IllegalArgumentException.class, () -> context1.preProcess(false));
Expand Down Expand Up @@ -334,7 +336,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
null
null,
100000
);

SliceBuilder sliceBuilder = mock(SliceBuilder.class);
Expand Down Expand Up @@ -374,7 +377,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
null
null,
100000
);
ParsedQuery parsedQuery = ParsedQuery.parsedMatchAllQuery();
context3.sliceBuilder(null).parsedQuery(parsedQuery).preProcess(false);
Expand Down Expand Up @@ -410,7 +414,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
null
null,
100000
);
context4.sliceBuilder(new SliceBuilder(1, 2)).parsedQuery(parsedQuery).preProcess(false);
Query query1 = context4.query();
Expand Down Expand Up @@ -441,7 +446,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
null
null,
100000
);
int numSlicesForPit = maxSlicesPerPit + randomIntBetween(1, 100);
when(sliceBuilder.getMax()).thenReturn(numSlicesForPit);
Expand Down Expand Up @@ -539,7 +545,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
null
null,
100000
);
assertThat(context.searcher().hasCancellations(), is(false));
context.searcher().addQueryCancellation(() -> {});
Expand Down Expand Up @@ -639,7 +646,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
null
null,
100000
);

// Case1: if sort is on timestamp field, non-concurrent path is used
Expand All @@ -664,7 +672,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
null
null,
100000
);
context.sort(
new SortAndFormats(new Sort(new SortField("test2", SortField.Type.INT)), new DocValueFormat[] { DocValueFormat.RAW })
Expand All @@ -691,7 +700,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
null
null,
100000
);
context.evaluateRequestShouldUseConcurrentSearch();
if (executor == null) {
Expand Down
Loading

0 comments on commit 1dd3eed

Please sign in to comment.