Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add all-active ingestion as docrep equivalent in pull-based ingestion ([#19316](https://github.com/opensearch-project/OpenSearch/pull/19316))
- Adding logic for histogram aggregation using skiplist ([#19130](https://github.com/opensearch-project/OpenSearch/pull/19130))
- Add skip_list param for date, scaled float and token count fields ([#19142](https://github.com/opensearch-project/OpenSearch/pull/19142))
- Optimization in coordinator logic to use quickselect ([#18702](https://github.com/opensearch-project/OpenSearch/pull/18777))

### Changed
- Refactor `if-else` chains to use `Java 17 pattern matching switch expressions`(([#18965](https://github.com/opensearch-project/OpenSearch/pull/18965))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package org.opensearch.search.aggregations.bucket.terms;

import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.PriorityQueue;
import org.opensearch.core.ParseField;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -462,30 +463,79 @@ For backward compatibility, we disable the merge sort and use ({@link InternalTe
final B[] list;
if (reduceContext.isFinalReduce() || reduceContext.isSliceLevel()) {
final int size = Math.min(localBucketCountThresholds.getRequiredSize(), reducedBuckets.size());
// final comparator
final BucketPriorityQueue<B> ordered = new BucketPriorityQueue<>(size, order.comparator());
for (B bucket : reducedBuckets) {
if (sumDocCountError == -1) {
bucket.setDocCountError(-1);
} else {
final long finalSumDocCountError = sumDocCountError;
bucket.setDocCountError(docCountError -> docCountError + finalSumDocCountError);
}
if (bucket.getDocCount() >= localBucketCountThresholds.getMinDocCount()) {
B removed = ordered.insertWithOverflow(bucket);
if (removed != null) {
otherDocCount += removed.getDocCount();
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
if (size > 1000) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reason for choosing 1000 for the if condition is : #18777 (comment)

final Comparator<MultiBucketsAggregation.Bucket> cmp = order.comparator();
int validBucketCount = 0;
// Process buckets and update doc count errors and count valid buckets
for (B bucket : reducedBuckets) {
if (sumDocCountError == -1) {
bucket.setDocCountError(-1);
} else {
final long finalSumDocCountError = sumDocCountError;
bucket.setDocCountError(docCountError -> docCountError + finalSumDocCountError);
}

if (bucket.getDocCount() < localBucketCountThresholds.getMinDocCount()) {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket));
} else {
validBucketCount++;
}
}
// Create array and populate with valid buckets
B[] validBuckets = createBucketsArray(validBucketCount);
int arrayIndex = 0;
for (B bucket : reducedBuckets) {
if (bucket.getDocCount() >= localBucketCountThresholds.getMinDocCount()) {
validBuckets[arrayIndex++] = bucket;
}
}
// Select top buckets if needed
if (size < validBuckets.length) {
ArrayUtil.select(validBuckets, 0, validBuckets.length, size, cmp);
}
// Process selected buckets and calculate otherDocCount
int finalSize = Math.min(size, validBuckets.length);
for (int i = 0; i < validBuckets.length; i++) {
B bucket = validBuckets[i];
if (i < finalSize) {
reduceContext.consumeBucketsAndMaybeBreak(1);
} else {
otherDocCount += bucket.getDocCount();
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket));
}
}
if (finalSize != validBuckets.length) {
list = createBucketsArray(finalSize);
System.arraycopy(validBuckets, 0, list, 0, finalSize);
} else {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket));
list = validBuckets;
}
Arrays.sort(list, cmp);
} else {
final BucketPriorityQueue<B> ordered = new BucketPriorityQueue<>(size, order.comparator());
for (B bucket : reducedBuckets) {
if (sumDocCountError == -1) {
bucket.setDocCountError(-1);
} else {
final long finalSumDocCountError = sumDocCountError;
bucket.setDocCountError(docCountError -> docCountError + finalSumDocCountError);
}
if (bucket.getDocCount() >= localBucketCountThresholds.getMinDocCount()) {
B removed = ordered.insertWithOverflow(bucket);
if (removed != null) {
otherDocCount += removed.getDocCount();
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
} else {
reduceContext.consumeBucketsAndMaybeBreak(1);
}
} else {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket));
}
}
list = createBucketsArray(ordered.size());
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = ordered.pop();
}
}
list = createBucketsArray(ordered.size());
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = ordered.pop();
}
} else {
// we can prune the list on partial reduce if the aggregation is ordered by key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1672,6 +1672,78 @@ public void testBucketInTermsAggregationWithMissingValue() throws IOException {
}
}

public void testHighCardinalityNumericTermsAggregation() throws Exception {
try (Directory directory = newDirectory()) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
for (int i = 1; i <= 1500; i++) {
int repeatCount = (i <= 200) ? 1 : 2;
for (int j = 1; j <= repeatCount; j++) {
Document document = new Document();
document.add(new NumericDocValuesField("user_id", i));
indexWriter.addDocument(document);
}
}
indexWriter.flush();
try (IndexReader indexReader = maybeWrapReaderEs(indexWriter.getReader())) {
IndexSearcher indexSearcher = newIndexSearcher(indexReader);
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("user_id", NumberFieldMapper.NumberType.LONG);

// When the request size equal to the cardinality
TermsAggregationBuilder aggregationBuilder0 = new TermsAggregationBuilder("user_id_terms").field("user_id").size(1500);
Terms result0 = getAggregatedTerms(aggregationBuilder0, indexSearcher, fieldType);
assertEquals(1500, result0.getBuckets().size());
for (int i = 1; i <= 1500; i++) {
Terms.Bucket bucket = result0.getBucketByKey(String.valueOf(i));
if (i <= 200) {
assertEquals(1L, bucket.getDocCount());
} else {
assertEquals(2L, bucket.getDocCount());
}
}

// Test includes minDocCount and request size is lesser than the total number of buckets
TermsAggregationBuilder aggregationBuilder1 = new TermsAggregationBuilder("user_id_terms").field("user_id")
.minDocCount(2)
.size(1100);
Terms result1 = getAggregatedTerms(aggregationBuilder1, indexSearcher, fieldType);
assertEquals(1100, result1.getBuckets().size());
for (int i = 1; i <= 1100; i++) {
Terms.Bucket bucket = result0.getBucketByKey(String.valueOf(i));
if (i <= 200) {
assertEquals(1L, bucket.getDocCount());
} else {
assertEquals(2L, bucket.getDocCount());
}
}

// Request size is lesser than 1000, which performs priority queue based selection
TermsAggregationBuilder aggregationBuilder2 = new TermsAggregationBuilder("user_id_terms").field("user_id")
.minDocCount(2)
.size(900);
Terms result2 = getAggregatedTerms(aggregationBuilder2, indexSearcher, fieldType);
assertEquals(900, result2.getBuckets().size());
for (int i = 1; i <= 900; i++) {
Terms.Bucket bucket = result0.getBucketByKey(String.valueOf(i));
if (i <= 200) {
assertEquals(1L, bucket.getDocCount());
} else {
assertEquals(2L, bucket.getDocCount());
}
}
}
}
}
}

private Terms getAggregatedTerms(TermsAggregationBuilder aggregationBuilder, IndexSearcher indexSearcher, MappedFieldType fieldType)
throws IOException {
TermsAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
return reduce(aggregator);
}

private final SeqNoFieldMapper.SequenceIDFields sequenceIDFields = SeqNoFieldMapper.SequenceIDFields.emptySeqID();

private List<Document> generateDocsWithNested(String id, int value, int[] nestedValues) {
Expand Down
Loading