From d1e5d1c5246965de816bc213e0f3ef20c880fbc5 Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Thu, 3 Aug 2023 11:34:33 -0700 Subject: [PATCH] Do not evaluate shard_size and shard_min_doc_count at slice level for concurrent segment search Signed-off-by: Jay Deng --- .../AggregationCollectorManager.java | 7 +- .../InternalSingleBucketAggregation.java | 2 +- .../GlobalOrdinalsStringTermsAggregator.java | 20 +++- .../terms/InternalSignificantTerms.java | 102 +++++++++++++++++- .../bucket/terms/InternalTerms.java | 55 ++++++++-- .../terms/MapStringTermsAggregator.java | 20 +++- .../bucket/terms/MultiTermsAggregator.java | 15 ++- .../bucket/terms/NumericTermsAggregator.java | 28 +++-- 8 files changed, 216 insertions(+), 33 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java index 1f60ff6503ca8..37ea3e5b3f426 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java @@ -63,10 +63,11 @@ public ReduceableSearchResult reduce(Collection collectors) throws IO } final InternalAggregations internalAggregations = InternalAggregations.from(internals); - // Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce iff multiple slices - // were created to execute this request and it used concurrent segment search path + // Reduce the aggregations across slices before sending to the coordinator. + // We will perform shard level reduce if multiple slices were created to execute this request and the mustReduceOnSingleInternalAgg + // flag is true for the given Aggregation // TODO: Add the check for flag that the request was executed using concurrent search - if (collectors.size() > 1) { + if (collectors.size() > 1 || ((InternalAggregation) internalAggregations.aggregations.get(0)).mustReduceOnSingleInternalAgg()) { // using reduce is fine here instead of topLevelReduce as pipeline aggregation is evaluated on the coordinator after all // documents are collected across shards for an aggregation return new AggregationReduceableSearchResult( diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/InternalSingleBucketAggregation.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/InternalSingleBucketAggregation.java index 03fade2edb392..e50acf63654d8 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/InternalSingleBucketAggregation.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/InternalSingleBucketAggregation.java @@ -198,7 +198,7 @@ public final double sortValue(AggregationPath.PathElement head, Iterator ordered = buildPriorityQueue(size); final int finalOrdIdx = ordIdx; @@ -630,7 +634,8 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws @Override public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException { otherDocCount[finalOrdIdx] += docCount; - if (docCount >= bucketCountThresholds.getShardMinDocCount()) { + // Don't evaluate shard_min_doc_count at the slice level for concurrent segment search + if (context.isConcurrentSegmentSearchEnabled() || docCount >= bucketCountThresholds.getShardMinDocCount()) { if (spare == null) { spare = buildEmptyTemporaryBucket(); } @@ -795,7 +800,7 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu } else { reduceOrder = order; } - return new StringTerms( + StringTerms stringTerms = new StringTerms( name, reduceOrder, order, @@ -809,6 +814,8 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu Arrays.asList(topBuckets), 0 ); + stringTerms.setShardMinDocCount(bucketCountThresholds.getShardMinDocCount()); + return stringTerms; } @Override @@ -922,7 +929,7 @@ void buildSubAggs(SignificantStringTerms.Bucket[][] topBucketsPreOrd) throws IOE @Override SignificantStringTerms buildResult(long owningBucketOrd, long otherDocCount, SignificantStringTerms.Bucket[] topBuckets) { - return new SignificantStringTerms( + SignificantStringTerms significantStringTerms = new SignificantStringTerms( name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), @@ -933,6 +940,9 @@ SignificantStringTerms buildResult(long owningBucketOrd, long otherDocCount, Sig significanceHeuristic, Arrays.asList(topBuckets) ); + significantStringTerms.setShardSize(bucketCountThresholds.getShardSize()); + significantStringTerms.setShardMinDocCount(bucketCountThresholds.getShardMinDocCount()); + return significantStringTerms; } @Override diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java index 84d148199a7f9..7686f429b5e57 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java @@ -195,11 +195,15 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params) protected final int requiredSize; protected final long minDocCount; + protected int shardSize; + protected long shardMinDocCount; protected InternalSignificantTerms(String name, int requiredSize, long minDocCount, Map metadata) { super(name, metadata); this.requiredSize = requiredSize; this.minDocCount = minDocCount; + shardSize = 0; + shardMinDocCount = 0; } /** @@ -222,8 +226,32 @@ protected final void doWriteTo(StreamOutput out) throws IOException { @Override public abstract List getBuckets(); + public int getShardSize() { + return shardSize; + } + + public void setShardSize(int shardSize) { + this.shardSize = shardSize; + } + + public long getShardMinDocCount() { + return shardMinDocCount; + } + + public void setShardMinDocCount(long shardMinDocCount) { + this.shardMinDocCount = shardMinDocCount; + } + @Override public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { + if (reduceContext.isSliceLevel()) { + return reduceOnShard(aggregations, reduceContext); + } else { + return reduceOnCoordinator(aggregations, reduceContext); + } + } + + private InternalAggregation reduceOnShard(List aggregations, ReduceContext reduceContext) { long globalSubsetSize = 0; long globalSupersetSize = 0; // Compute the overall result set size and the corpus size using the @@ -234,12 +262,72 @@ public InternalAggregation reduce(List aggregations, Reduce globalSubsetSize += terms.getSubsetSize(); // supersetSize is a shard level count, if we sum it across slices we would produce num_slices_with_bucket * supersetSize where // num_slices_with_bucket is the number of segment slices that have collected a bucket for the key - if (reduceContext.isSliceLevel()) { - globalSupersetSize = terms.getSupersetSize(); + globalSupersetSize = terms.getSupersetSize(); + } + Map> buckets = new HashMap<>(); + for (InternalAggregation aggregation : aggregations) { + @SuppressWarnings("unchecked") + InternalSignificantTerms terms = (InternalSignificantTerms) aggregation; + for (B bucket : terms.getBuckets()) { + List existingBuckets = buckets.get(bucket.getKeyAsString()); + if (existingBuckets == null) { + existingBuckets = new ArrayList<>(aggregations.size()); + buckets.put(bucket.getKeyAsString(), existingBuckets); + } + // Adjust the buckets with the global stats representing the + // total size of the pots from which the stats are drawn + existingBuckets.add( + createBucket( + bucket.getSubsetDf(), + globalSubsetSize, + bucket.getSupersetDf(), + globalSupersetSize, + bucket.aggregations, + bucket + ) + ); + } + } + SignificanceHeuristic heuristic = getSignificanceHeuristic().rewrite(reduceContext); + // Apply shard_size limit at slice level reduce + final int size = Math.min(getShardSize(), buckets.size()); + BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue<>(size); + for (Map.Entry> entry : buckets.entrySet()) { + List sameTermBuckets = entry.getValue(); + final B b = reduceBucket(sameTermBuckets, reduceContext); + b.updateScore(heuristic); + // this needs to be simplified greatly + if ((b.score > 0) && (b.subsetDf >= getShardMinDocCount())) { + B removed = ordered.insertWithOverflow(b); + if (removed == null) { + reduceContext.consumeBucketsAndMaybeBreak(1); + } else { + reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed)); + } } else { - globalSupersetSize += terms.getSupersetSize(); + reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b)); } } + B[] list = createBucketsArray(ordered.size()); + for (int i = ordered.size() - 1; i >= 0; i--) { + list[i] = ordered.pop(); + } + return create(globalSubsetSize, globalSupersetSize, Arrays.asList(list)); + } + + private InternalAggregation reduceOnCoordinator(List aggregations, ReduceContext reduceContext) { + long globalSubsetSize = 0; + long globalSupersetSize = 0; + // Compute the overall result set size and the corpus size using the + // top-level Aggregations from each shard + for (InternalAggregation aggregation : aggregations) { + @SuppressWarnings("unchecked") + InternalSignificantTerms terms = (InternalSignificantTerms) aggregation; + globalSubsetSize += terms.getSubsetSize(); + // supersetSize is a shard level count, if we sum it across slices we would produce num_slices_with_bucket * supersetSize where + // num_slices_with_bucket is the number of segment slices that have collected a bucket for the key + globalSupersetSize += terms.getSupersetSize(); + } Map> buckets = new HashMap<>(); for (InternalAggregation aggregation : aggregations) { @SuppressWarnings("unchecked") @@ -265,7 +353,13 @@ public InternalAggregation reduce(List aggregations, Reduce } } SignificanceHeuristic heuristic = getSignificanceHeuristic().rewrite(reduceContext); - final int size = reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size()); + // Apply shard_size limit at slice level reduce + final int size; + if (reduceContext.isFinalReduce()) { + size = Math.min(requiredSize, buckets.size()); + } else { + size = buckets.size(); + } BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue<>(size); for (Map.Entry> entry : buckets.entrySet()) { List sameTermBuckets = entry.getValue(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java index 9a80155eea51c..f73f827f27565 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java @@ -223,6 +223,7 @@ public int hashCode() { protected final BucketOrder order; protected final int requiredSize; protected final long minDocCount; + protected long shardMinDocCount; /** * Creates a new {@link InternalTerms} @@ -246,6 +247,7 @@ protected InternalTerms( this.order = order; this.requiredSize = requiredSize; this.minDocCount = minDocCount; + this.shardMinDocCount = 0; } /** @@ -329,9 +331,18 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { pq.add(new IteratorAndCurrent(terms.getBuckets().iterator())); } } - List reducedBuckets = new ArrayList<>(); + ; + final BucketPriorityQueue reducedBuckets; // list of buckets coming from different shards that have the same key List currentBuckets = new ArrayList<>(); + + // Apply shard_size parameter at the slice reduce level if it is > 0 + if (reduceContext.isSliceLevel() && getShardSize() > 0) { + reducedBuckets = new BucketPriorityQueue<>(getShardSize(), order.comparator()); + } else { + reducedBuckets = new BucketPriorityQueue<>(requiredSize, order.comparator()); + } + B lastBucket = null; while (pq.size() > 0) { final IteratorAndCurrent top = pq.top(); @@ -339,7 +350,9 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { if (lastBucket != null && cmp.compare(top.current(), lastBucket) != 0) { // the key changes, reduce what we already buffered and reset the buffer for current buckets final B reduced = reduceBucket(currentBuckets, reduceContext); - reducedBuckets.add(reduced); + if (!reduceContext.isSliceLevel() || reduced.getDocCount() >= getShardMinDocCount()) { + reducedBuckets.insertWithOverflow(reduced); + } currentBuckets.clear(); } lastBucket = top.current(); @@ -355,9 +368,17 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { if (currentBuckets.isEmpty() == false) { final B reduced = reduceBucket(currentBuckets, reduceContext); - reducedBuckets.add(reduced); + // Apply shard_min_doc_count parameter at the slice reduce level + if (!reduceContext.isSliceLevel() || reduced.getDocCount() >= getShardMinDocCount()) { + reducedBuckets.insertWithOverflow(reduced); + } } - return reducedBuckets; + + // Shards must return buckets sorted by key + List result = new ArrayList<>(); + reducedBuckets.forEach(result::add); + result.sort(cmp); + return result; } private List reduceLegacy(List aggregations, ReduceContext reduceContext) { @@ -376,12 +397,24 @@ private List reduceLegacy(List aggregations, ReduceConte } } } - List reducedBuckets = new ArrayList<>(); + final BucketPriorityQueue reducedBuckets; + // Apply shard_size parameter at the slice reduce level if it is > 0 + if (reduceContext.isSliceLevel() && getShardSize() > 0) { + reducedBuckets = new BucketPriorityQueue<>(getShardSize(), order.comparator()); + } else { + reducedBuckets = new BucketPriorityQueue<>(requiredSize, order.comparator()); + } for (List sameTermBuckets : bucketMap.values()) { final B b = reduceBucket(sameTermBuckets, reduceContext); - reducedBuckets.add(b); + // Apply shard_min_doc_count parameter at the slice reduce level + if (!reduceContext.isSliceLevel() || b.getDocCount() >= getShardMinDocCount()) { + reducedBuckets.insertWithOverflow(b); + } } - return reducedBuckets; + + List result = new ArrayList<>(); + reducedBuckets.forEach(result::add); + return result; } public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { @@ -521,6 +554,14 @@ protected B reduceBucket(List buckets, ReduceContext context) { return createBucket(docCount, aggs, docCountError, buckets.get(0)); } + protected void setShardMinDocCount(long shardMinDocCount) { + this.shardMinDocCount = shardMinDocCount; + } + + protected long getShardMinDocCount() { + return shardMinDocCount; + } + protected abstract void setDocCountError(long docCountError); protected abstract int getShardSize(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java index bcdf1f4480a31..f09abfdf9b009 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java @@ -248,8 +248,14 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws long[] otherDocCounts = new long[owningBucketOrds.length]; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx]); - int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize()); + // Do not apply shard_size at the slice level for concurrent segment search + int size; + if (context.isConcurrentSegmentSearchEnabled()) { + size = (int) bucketOrds.size(); + } else { + size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize()); + } PriorityQueue ordered = buildPriorityQueue(size); B spare = null; BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); @@ -257,7 +263,8 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws while (ordsEnum.next()) { long docCount = bucketDocCount(ordsEnum.ord()); otherDocCounts[ordIdx] += docCount; - if (docCount < bucketCountThresholds.getShardMinDocCount()) { + // Don't evaluate shard_min_doc_count at the slice level for concurrent segment search + if (!context.isConcurrentSegmentSearchEnabled() && docCount < bucketCountThresholds.getShardMinDocCount()) { continue; } if (spare == null) { @@ -450,7 +457,7 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu } else { reduceOrder = order; } - return new StringTerms( + StringTerms stringTerms = new StringTerms( name, reduceOrder, order, @@ -464,6 +471,8 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu Arrays.asList(topBuckets), 0 ); + stringTerms.setShardMinDocCount(bucketCountThresholds.getShardMinDocCount()); + return stringTerms; } @Override @@ -570,7 +579,7 @@ void buildSubAggs(SignificantStringTerms.Bucket[][] topBucketsPerOrd) throws IOE @Override SignificantStringTerms buildResult(long owningBucketOrd, long otherDocCount, SignificantStringTerms.Bucket[] topBuckets) { - return new SignificantStringTerms( + SignificantStringTerms significantStringTerms = new SignificantStringTerms( name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), @@ -581,6 +590,9 @@ SignificantStringTerms buildResult(long owningBucketOrd, long otherDocCount, Sig significanceHeuristic, Arrays.asList(topBuckets) ); + significantStringTerms.setShardMinDocCount(bucketCountThresholds.getShardMinDocCount()); + significantStringTerms.setShardSize(bucketCountThresholds.getShardSize()); + return significantStringTerms; } @Override diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java index 9d99c0b90a075..14d52322cf0e2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java @@ -124,7 +124,13 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx]); long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]); - int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize()); + // Do not apply shard_size at the slice level for concurrent segment search + int size; + if (context.isConcurrentSegmentSearchEnabled()) { + size = (int) bucketsInOrd; + } else { + size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize()); + } PriorityQueue ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator); InternalMultiTerms.Bucket spare = null; BytesRef dest = null; @@ -136,7 +142,8 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I while (ordsEnum.next()) { long docCount = bucketDocCount(ordsEnum.ord()); otherDocCounts[ordIdx] += docCount; - if (docCount < bucketCountThresholds.getShardMinDocCount()) { + // Don't evaluate shard_min_doc_count at the slice level for concurrent segment search + if (!context.isConcurrentSegmentSearchEnabled() && docCount < bucketCountThresholds.getShardMinDocCount()) { continue; } if (spare == null) { @@ -178,7 +185,7 @@ InternalMultiTerms buildResult(long owningBucketOrd, long otherDocCount, Interna } else { reduceOrder = order; } - return new InternalMultiTerms( + InternalMultiTerms internalMultiTerms = new InternalMultiTerms( name, reduceOrder, order, @@ -192,6 +199,8 @@ InternalMultiTerms buildResult(long owningBucketOrd, long otherDocCount, Interna formats, List.of(topBuckets) ); + internalMultiTerms.setShardMinDocCount(bucketCountThresholds.getShardMinDocCount()); + return internalMultiTerms; } @Override diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java index a0265135fe9d3..4bc9cc4cb8d63 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java @@ -179,7 +179,13 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx]); long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]); - int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize()); + // Do not apply shard_size at the slice level for concurrent segment search + int size; + if (context.isConcurrentSegmentSearchEnabled()) { + size = (int) bucketsInOrd; + } else { + size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize()); + } PriorityQueue ordered = buildPriorityQueue(size); B spare = null; BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); @@ -187,7 +193,8 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws while (ordsEnum.next()) { long docCount = bucketDocCount(ordsEnum.ord()); otherDocCounts[ordIdx] += docCount; - if (docCount < bucketCountThresholds.getShardMinDocCount()) { + // Don't evaluate shard_min_doc_count at the slice level for concurrent segment search + if (!context.isConcurrentSegmentSearchEnabled() && docCount < bucketCountThresholds.getShardMinDocCount()) { continue; } if (spare == null) { @@ -391,7 +398,7 @@ LongTerms buildResult(long owningBucketOrd, long otherDocCount, LongTerms.Bucket } else { reduceOrder = order; } - return new LongTerms( + LongTerms longTerms = new LongTerms( name, reduceOrder, order, @@ -405,6 +412,8 @@ LongTerms buildResult(long owningBucketOrd, long otherDocCount, LongTerms.Bucket List.of(topBuckets), 0 ); + longTerms.setShardMinDocCount(bucketCountThresholds.getShardMinDocCount()); + return longTerms; } @Override @@ -473,7 +482,7 @@ DoubleTerms buildResult(long owningBucketOrd, long otherDocCount, DoubleTerms.Bu } else { reduceOrder = order; } - return new DoubleTerms( + DoubleTerms doubleTerms = new DoubleTerms( name, reduceOrder, order, @@ -487,6 +496,8 @@ DoubleTerms buildResult(long owningBucketOrd, long otherDocCount, DoubleTerms.Bu List.of(topBuckets), 0 ); + doubleTerms.setShardMinDocCount(bucketCountThresholds.getShardMinDocCount()); + return doubleTerms; } @Override @@ -554,7 +565,7 @@ UnsignedLongTerms buildResult(long owningBucketOrd, long otherDocCount, Unsigned } else { reduceOrder = order; } - return new UnsignedLongTerms( + UnsignedLongTerms unsignedLongTerms = new UnsignedLongTerms( name, reduceOrder, order, @@ -568,6 +579,8 @@ UnsignedLongTerms buildResult(long owningBucketOrd, long otherDocCount, Unsigned List.of(topBuckets), 0 ); + unsignedLongTerms.setShardMinDocCount(bucketCountThresholds.getShardMinDocCount()); + return unsignedLongTerms; } @Override @@ -670,7 +683,7 @@ void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOException {} @Override SignificantLongTerms buildResult(long owningBucketOrd, long otherDocCoun, SignificantLongTerms.Bucket[] topBuckets) { - return new SignificantLongTerms( + SignificantLongTerms significantLongTerms = new SignificantLongTerms( name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), @@ -681,6 +694,9 @@ SignificantLongTerms buildResult(long owningBucketOrd, long otherDocCoun, Signif significanceHeuristic, List.of(topBuckets) ); + significantLongTerms.setShardMinDocCount(bucketCountThresholds.getShardMinDocCount()); + significantLongTerms.setShardSize(bucketCountThresholds.getShardSize()); + return significantLongTerms; } @Override