Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not evaluate shard_size and shard_min_doc_count at segment slice level #9085

Merged
merged 4 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for aggregation profiler with concurrent aggregation ([#8801](https://github.com/opensearch-project/OpenSearch/pull/8801))
- [Remove] Deprecated Fractional ByteSizeValue support #9005 ([#9005](https://github.com/opensearch-project/OpenSearch/pull/9005))
- Make MultiBucketConsumerService thread safe to use across slices during search ([#9047](https://github.com/opensearch-project/OpenSearch/pull/9047))
- Change shard_size and shard_min_doc_count evaluation to happen in shard level reduce phase ([#9085](https://github.com/opensearch-project/OpenSearch/pull/9085))

### Deprecated

Expand All @@ -129,4 +130,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.aggregations.MultiBucketConsumerService;
import org.opensearch.search.aggregations.bucket.terms.StringTerms;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.query.QuerySearchResult;
Expand Down Expand Up @@ -170,15 +171,14 @@ private StringTerms newTerms(Random rand, BytesRef[] dict, boolean withNested) {
"terms",
BucketOrder.key(true),
BucketOrder.count(false),
topNSize,
1,
Collections.emptyMap(),
DocValueFormat.RAW,
numShards,
true,
0,
buckets,
0
0,
new TermsAggregator.BucketCountThresholds(1, 0, topNSize, numShards)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregator;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -86,15 +87,14 @@ private StringTerms newTerms(boolean withNested) {
"test",
BucketOrder.key(true),
BucketOrder.key(true),
buckets,
1,
null,
DocValueFormat.RAW,
buckets,
false,
100000,
resultBuckets,
0
0,
new TermsAggregator.BucketCountThresholds(1, 0, buckets, buckets)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* aggregation operators
*/
class AggregationCollectorManager implements CollectorManager<Collector, ReduceableSearchResult> {
private final SearchContext context;
protected final SearchContext context;
private final CheckedFunction<SearchContext, List<Aggregator>, IOException> aggProvider;
private final String collectorReason;

Expand Down Expand Up @@ -63,18 +63,11 @@
}

final InternalAggregations internalAggregations = InternalAggregations.from(internals);
// Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce iff multiple slices
// were created to execute this request and it used concurrent segment search path
// TODO: Add the check for flag that the request was executed using concurrent search
if (collectors.size() > 1) {
// using reduce is fine here instead of topLevelReduce as pipeline aggregation is evaluated on the coordinator after all
// documents are collected across shards for an aggregation
return new AggregationReduceableSearchResult(
InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard())
);
} else {
return new AggregationReduceableSearchResult(internalAggregations);
}
return buildAggregationResult(internalAggregations);
}

public AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) {
jed326 marked this conversation as resolved.
Show resolved Hide resolved
return new AggregationReduceableSearchResult(internalAggregations);

Check warning on line 70 in server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java#L70

Added line #L70 was not covered by tests
}

static Collector createCollector(SearchContext context, List<Aggregator> collectors, String reason) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.search.profile.query.CollectorResult;

import java.io.IOException;
import java.util.Collections;
import java.util.Objects;

/**
Expand All @@ -38,4 +39,13 @@ public Collector newCollector() throws IOException {
return super.newCollector();
}
}

@Override
public AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) {
// Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce as long as any slices
// were created so that we can apply shard level bucket count thresholds in the reduce phase.
return new AggregationReduceableSearchResult(
InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.rest.action.search.RestSearchAction;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.opensearch.search.aggregations.support.AggregationPath;
Expand Down Expand Up @@ -160,6 +161,16 @@ public boolean isSliceLevel() {
return this.isSliceLevel;
}

// For slice level partial reduce we will apply shard level `shard_size` and `shard_min_doc_count` limits whereas for coordinator
// level partial reduce it will use top level `size` and `min_doc_count`
public int getRequiredSizeLocal(TermsAggregator.BucketCountThresholds bucketCountThresholds) {
jed326 marked this conversation as resolved.
Show resolved Hide resolved
return isSliceLevel() ? bucketCountThresholds.getShardSize() : bucketCountThresholds.getRequiredSize();
}

public long getMinDocCountLocal(TermsAggregator.BucketCountThresholds bucketCountThresholds) {
return isSliceLevel() ? bucketCountThresholds.getShardMinDocCount() : bucketCountThresholds.getMinDocCount();
jed326 marked this conversation as resolved.
Show resolved Hide resolved
}

public BigArrays bigArrays() {
return bigArrays;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.search.profile.query.CollectorResult;

import java.io.IOException;
import java.util.Collections;
import java.util.Objects;

/**
Expand All @@ -38,4 +39,13 @@ public Collector newCollector() throws IOException {
return super.newCollector();
}
}

@Override
public AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) {
// Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce as long as any slices
// were created so that we can apply shard level bucket count thresholds in the reduce phase.
return new AggregationReduceableSearchResult(
InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,14 @@ protected StringTerms buildEmptyTermsAggregation() {
name,
order,
order,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
bucketCountThresholds.getShardSize(),
showTermDocCountError,
0,
emptyList(),
0
0,
bucketCountThresholds
);
}

Expand All @@ -95,14 +94,13 @@ protected SignificantStringTerms buildEmptySignificantTermsAggregation(long subs
int supersetSize = topReader.numDocs();
return new SignificantStringTerms(
name,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
subsetSize,
supersetSize,
significanceHeuristic,
emptyList()
emptyList(),
bucketCountThresholds
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,29 +130,27 @@ public DoubleTerms(
String name,
BucketOrder reduceOrder,
BucketOrder order,
int requiredSize,
long minDocCount,
Map<String, Object> metadata,
DocValueFormat format,
int shardSize,
boolean showTermDocCountError,
long otherDocCount,
List<Bucket> buckets,
long docCountError
long docCountError,
TermsAggregator.BucketCountThresholds bucketCountThresholds
) {
super(
name,
reduceOrder,
order,
requiredSize,
minDocCount,
metadata,
format,
shardSize,
showTermDocCountError,
otherDocCount,
buckets,
docCountError
docCountError,
bucketCountThresholds
);
}

Expand All @@ -174,15 +172,14 @@ public DoubleTerms create(List<Bucket> buckets) {
name,
reduceOrder,
order,
requiredSize,
minDocCount,
metadata,
format,
shardSize,
showTermDocCountError,
otherDocCount,
buckets,
docCountError
docCountError,
bucketCountThresholds
);
}

Expand All @@ -204,15 +201,14 @@ protected DoubleTerms create(String name, List<Bucket> buckets, BucketOrder redu
name,
reduceOrder,
order,
requiredSize,
minDocCount,
getMetadata(),
format,
shardSize,
showTermDocCountError,
otherDocCount,
buckets,
docCountError
docCountError,
bucketCountThresholds
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,11 +615,11 @@
long[] otherDocCount = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
final int size;
if (bucketCountThresholds.getMinDocCount() == 0) {
if (context.getMinDocCountLocal(bucketCountThresholds) == 0) {
// if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns
size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize());
size = (int) Math.min(valueCount, context.getRequiredSizeLocal(bucketCountThresholds));
} else {
size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize());
size = (int) Math.min(maxBucketOrd(), context.getRequiredSizeLocal(bucketCountThresholds));

Check warning on line 622 in server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java#L622

Added line #L622 was not covered by tests
}
PriorityQueue<TB> ordered = buildPriorityQueue(size);
final int finalOrdIdx = ordIdx;
Expand All @@ -630,7 +630,7 @@
@Override
public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException {
otherDocCount[finalOrdIdx] += docCount;
if (docCount >= bucketCountThresholds.getShardMinDocCount()) {
if (docCount >= context.getMinDocCountLocal(bucketCountThresholds)) {
if (spare == null) {
spare = buildEmptyTemporaryBucket();
}
Expand Down Expand Up @@ -799,15 +799,14 @@
name,
reduceOrder,
order,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
bucketCountThresholds.getShardSize(),
showTermDocCountError,
otherDocCount,
Arrays.asList(topBuckets),
0
0,
bucketCountThresholds
);
}

Expand Down Expand Up @@ -924,14 +923,13 @@
SignificantStringTerms buildResult(long owningBucketOrd, long otherDocCount, SignificantStringTerms.Bucket[] topBuckets) {
return new SignificantStringTerms(
name,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
subsetSize(owningBucketOrd),
supersetSize,
significanceHeuristic,
Arrays.asList(topBuckets)
Arrays.asList(topBuckets),
bucketCountThresholds
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,15 @@ public abstract class InternalMappedSignificantTerms<

protected InternalMappedSignificantTerms(
String name,
int requiredSize,
long minDocCount,
Map<String, Object> metadata,
DocValueFormat format,
long subsetSize,
long supersetSize,
SignificanceHeuristic significanceHeuristic,
List<B> buckets
List<B> buckets,
TermsAggregator.BucketCountThresholds bucketCountThresholds
) {
super(name, requiredSize, minDocCount, metadata);
super(name, bucketCountThresholds, metadata);
this.format = format;
this.buckets = buckets;
this.subsetSize = subsetSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,16 @@ protected InternalMappedTerms(
String name,
BucketOrder reduceOrder,
BucketOrder order,
int requiredSize,
long minDocCount,
Map<String, Object> metadata,
DocValueFormat format,
int shardSize,
boolean showTermDocCountError,
long otherDocCount,
List<B> buckets,
long docCountError
long docCountError,
TermsAggregator.BucketCountThresholds bucketCountThresholds
) {
super(name, reduceOrder, order, requiredSize, minDocCount, metadata);
super(name, reduceOrder, order, bucketCountThresholds, metadata);
this.format = format;
this.shardSize = shardSize;
this.showTermDocCountError = showTermDocCountError;
Expand Down
Loading
Loading