Skip to content

Enforce max_buckets limit only in the final reduction phase #36152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,8 @@ public IndicesService getIndicesService() {
}

public InternalAggregation.ReduceContext createReduceContext(boolean finalReduce) {
return new InternalAggregation.ReduceContext(bigArrays, scriptService, multiBucketConsumerService.create(), finalReduce);
return new InternalAggregation.ReduceContext(bigArrays, scriptService,
finalReduce ? multiBucketConsumerService.create() : bucketCount -> {}, finalReduce);
}

public static final class CanMatchResponse extends SearchPhaseResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.elasticsearch.search;

import com.carrotsearch.hppc.IntArrayList;

import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -59,6 +58,8 @@
import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValueType;
Expand Down Expand Up @@ -155,7 +156,7 @@ protected Settings nodeSettings() {
return Settings.builder().put("search.default_search_timeout", "5s").build();
}

public void testClearOnClose() throws ExecutionException, InterruptedException {
public void testClearOnClose() {
createIndex("index");
client().prepareIndex("index", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
SearchResponse searchResponse = client().prepareSearch("index").setSize(1).setScroll("1m").get();
Expand All @@ -167,7 +168,7 @@ public void testClearOnClose() throws ExecutionException, InterruptedException {
assertEquals(0, service.getActiveContexts());
}

public void testClearOnStop() throws ExecutionException, InterruptedException {
public void testClearOnStop() {
createIndex("index");
client().prepareIndex("index", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
SearchResponse searchResponse = client().prepareSearch("index").setSize(1).setScroll("1m").get();
Expand All @@ -179,7 +180,7 @@ public void testClearOnStop() throws ExecutionException, InterruptedException {
assertEquals(0, service.getActiveContexts());
}

public void testClearIndexDelete() throws ExecutionException, InterruptedException {
public void testClearIndexDelete() {
createIndex("index");
client().prepareIndex("index", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
SearchResponse searchResponse = client().prepareSearch("index").setSize(1).setScroll("1m").get();
Expand Down Expand Up @@ -208,7 +209,7 @@ public void testCloseSearchContextOnRewriteException() {
assertEquals(activeRefs, indexShard.store().refCount());
}

public void testSearchWhileIndexDeleted() throws IOException, InterruptedException {
public void testSearchWhileIndexDeleted() throws InterruptedException {
createIndex("index");
client().prepareIndex("index", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();

Expand Down Expand Up @@ -443,15 +444,15 @@ protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) {
}

@Override
protected void doWriteTo(StreamOutput out) throws IOException {
protected void doWriteTo(StreamOutput out) {
}

@Override
protected void doXContent(XContentBuilder builder, Params params) throws IOException {
protected void doXContent(XContentBuilder builder, Params params) {
}

@Override
protected Query doToQuery(QueryShardContext context) throws IOException {
protected Query doToQuery(QueryShardContext context) {
return null;
}

Expand Down Expand Up @@ -501,7 +502,6 @@ public void testCanMatch() throws IOException {
assertFalse(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH,
new SearchSourceBuilder().query(new MatchNoneQueryBuilder()), Strings.EMPTY_ARRAY, false,
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults, null, null)));

}

public void testCanRewriteToMatchNone() {
Expand All @@ -519,7 +519,6 @@ public void testCanRewriteToMatchNone() {
.suggest(new SuggestBuilder())));
assertFalse(SearchService.canRewriteToMatchNone(new SearchSourceBuilder().query(new TermQueryBuilder("foo", "bar"))
.suggest(new SuggestBuilder())));

}

public void testSetSearchThrottled() {
Expand Down Expand Up @@ -568,4 +567,17 @@ public void testExpandSearchThrottled() {
assertHitCount(client().prepareSearch().get(), 0L);
assertHitCount(client().prepareSearch().setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED).get(), 1L);
}

public void testCreateReduceContext() {
final SearchService service = getInstanceFromNode(SearchService.class);
{
InternalAggregation.ReduceContext reduceContext = service.createReduceContext(true);
expectThrows(MultiBucketConsumerService.TooManyBucketsException.class,
() -> reduceContext.consumeBucketsAndMaybeBreak(MultiBucketConsumerService.DEFAULT_MAX_BUCKETS + 1));
}
{
InternalAggregation.ReduceContext reduceContext = service.createReduceContext(false);
reduceContext.consumeBucketsAndMaybeBreak(MultiBucketConsumerService.DEFAULT_MAX_BUCKETS + 1);
}
}
}