Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Introduced internal API for retrieving metadata about requested indices from transport actions ([#18523](https://github.com/opensearch-project/OpenSearch/pull/18523))

### Changed
- Combining filter rewrite and skip list to optimize sub aggregation([#19573](https://github.com/opensearch-project/OpenSearch/pull/19573))
- Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350))
- Refactor to move prepareIndex and prepareDelete methods to Engine class ([#19551](https://github.com/opensearch-project/OpenSearch/pull/19551))
- Omit maxScoreCollector in SimpleTopDocsCollectorContext when concurrent segment search enabled ([#19584](https://github.com/opensearch-project/OpenSearch/pull/19584))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations;

import org.apache.lucene.search.CheckedIntConsumer;
import org.apache.lucene.search.DocIdStream;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.MathUtil;

import java.io.IOException;

/**
* DocIdStream implementation using FixedBitSet. This is duplicate of the implementation in Lucene
* and should ideally eventually be removed.
*
* @opensearch.internal
*/
public final class BitSetDocIdStream extends DocIdStream {

private final FixedBitSet bitSet;
private final int offset, max;
private int upTo;

public BitSetDocIdStream(FixedBitSet bitSet, int offset) {
this.bitSet = bitSet;
this.offset = offset;
upTo = offset;
max = MathUtil.unsignedMin(Integer.MAX_VALUE, offset + bitSet.length());
}

@Override
public boolean mayHaveRemaining() {
return upTo < max;
}

@Override
public void forEach(int upTo, CheckedIntConsumer<IOException> consumer) throws IOException {
if (upTo > this.upTo) {
upTo = Math.min(upTo, max);
bitSet.forEach(this.upTo - offset, upTo - offset, offset, consumer);
this.upTo = upTo;
}
}

@Override
public int count(int upTo) throws IOException {
if (upTo > this.upTo) {
upTo = Math.min(upTo, max);
int count = bitSet.cardinality(this.upTo - offset, upTo - offset);
this.upTo = upTo;
return count;
} else {
return 0;
}
}

@Override
public int intoArray(int upTo, int[] array) {
if (upTo > this.upTo) {
upTo = Math.min(upTo, max);
int count = bitSet.intoArray(this.upTo - offset, upTo - offset, offset, array);
if (count == array.length) { // The whole range of doc IDs may not have been copied
upTo = array[array.length - 1] + 1;
}
this.upTo = upTo;
return count;
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@

package org.opensearch.search.aggregations;

import org.apache.lucene.search.DocIdStream;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Scorable;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;

import java.io.IOException;
Expand Down Expand Up @@ -123,6 +125,34 @@ public void collect(int doc) throws IOException {
collect(doc, 0);
}

/**
* Bulk-collect doc IDs within owningBucketOrd.
*
* <p>Note: The provided {@link DocIdStream} may be reused across calls and should be consumed
* immediately.
*
* <p>Note: The provided {@link DocIdStream} typically holds all the docIds for the corresponding
* owningBucketOrd. This method may be called multiple times per segment (but once per owningBucketOrd).
*
* <p>While the {@link DocIdStream} for each owningBucketOrd is sorted by docIds, it is NOT GUARANTEED
* that doc IDs arrive in order across invocations for different owningBucketOrd.
*
* <p>It is NOT LEGAL for callers to mix calls to {@link #collect(DocIdStream, long)} and {@link
* #collect(int, long)}.
*
* <p>The default implementation calls {@code stream.forEach(doc -> collect(doc, owningBucketOrd))}.
*/
@ExperimentalApi
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
stream.forEach(doc -> collect(doc, owningBucketOrd));
}

public void collect(int[] docIds, long owningBucketOrd) throws IOException {
for (int doc : docIds) {
collect(doc, owningBucketOrd);
}
}

@Override
public void setScorer(Scorable scorer) throws IOException {
// no-op by default
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations.bucket;

import org.apache.lucene.index.DocValuesSkipper;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.search.DocIdStream;
import org.apache.lucene.search.Scorable;
import org.opensearch.common.Rounding;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;

import java.io.IOException;

/**
* Histogram collection logic using skip list.
*
* @opensearch.internal
*/
public class HistogramSkiplistLeafCollector extends LeafBucketCollector {

private final NumericDocValues values;
private final DocValuesSkipper skipper;
private final Rounding.Prepared preparedRounding;
private final LongKeyedBucketOrds bucketOrds;
private final LeafBucketCollector sub;
private final BucketsAggregator aggregator;

/**
* Max doc ID (inclusive) up to which all docs values may map to the same bucket.
*/
private int upToInclusive = -1;

/**
* Whether all docs up to {@link #upToInclusive} values map to the same bucket.
*/
private boolean upToSameBucket;

/**
* Index in bucketOrds for docs up to {@link #upToInclusive}.
*/
private long upToBucketIndex;

public HistogramSkiplistLeafCollector(
NumericDocValues values,
DocValuesSkipper skipper,
Rounding.Prepared preparedRounding,
LongKeyedBucketOrds bucketOrds,
LeafBucketCollector sub,
BucketsAggregator aggregator
) {
this.values = values;
this.skipper = skipper;
this.preparedRounding = preparedRounding;
this.bucketOrds = bucketOrds;
this.sub = sub;
this.aggregator = aggregator;
}

@Override
public void setScorer(Scorable scorer) throws IOException {
if (sub != null) {
sub.setScorer(scorer);
}
}

private void advanceSkipper(int doc, long owningBucketOrd) throws IOException {
if (doc > skipper.maxDocID(0)) {
skipper.advance(doc);
}
upToSameBucket = false;

if (skipper.minDocID(0) > doc) {
// Corner case which happens if `doc` doesn't have a value and is between two intervals of
// the doc-value skip index.
upToInclusive = skipper.minDocID(0) - 1;
return;
}

upToInclusive = skipper.maxDocID(0);

// Now find the highest level where all docs map to the same bucket.
for (int level = 0; level < skipper.numLevels(); ++level) {
int totalDocsAtLevel = skipper.maxDocID(level) - skipper.minDocID(level) + 1;
long minBucket = preparedRounding.round(skipper.minValue(level));
long maxBucket = preparedRounding.round(skipper.maxValue(level));

if (skipper.docCount(level) == totalDocsAtLevel && minBucket == maxBucket) {
// All docs at this level have a value, and all values map to the same bucket.
upToInclusive = skipper.maxDocID(level);
upToSameBucket = true;
upToBucketIndex = bucketOrds.add(owningBucketOrd, maxBucket);
if (upToBucketIndex < 0) {
upToBucketIndex = -1 - upToBucketIndex;
}
} else {
break;
}
}
}

@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
if (doc > upToInclusive) {
advanceSkipper(doc, owningBucketOrd);
}

if (upToSameBucket) {
aggregator.incrementBucketDocCount(upToBucketIndex, 1L);
sub.collect(doc, upToBucketIndex);
} else if (values.advanceExact(doc)) {
final long value = values.longValue();
long bucketIndex = bucketOrds.add(owningBucketOrd, preparedRounding.round(value));
if (bucketIndex < 0) {
bucketIndex = -1 - bucketIndex;
aggregator.collectExistingBucket(sub, doc, bucketIndex);
} else {
aggregator.collectBucket(sub, doc, bucketIndex);
}
}
}

@Override
public void collect(DocIdStream stream) throws IOException {
// This will only be called if its the top agg
collect(stream, 0);
}

@Override
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
// This will only be called if its the sub aggregation
for (;;) {
int upToExclusive = upToInclusive + 1;
if (upToExclusive < 0) { // overflow
upToExclusive = Integer.MAX_VALUE;
}

if (upToSameBucket) {
if (sub == NO_OP_COLLECTOR) {
// stream.count maybe faster when we don't need to handle sub-aggs
long count = stream.count(upToExclusive);
aggregator.incrementBucketDocCount(upToBucketIndex, count);
} else {
int count = 0;
int[] docBuffer = new int[64];
int cnt = Integer.MAX_VALUE;
while (cnt != 0) {
cnt = stream.intoArray(upToExclusive, docBuffer);
sub.collect(docBuffer, upToBucketIndex);
Comment on lines +154 to +155
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the issue here is when cnt is less than docBuffer.length, sub.collect(docBuffer, upToBucketIndex); will still iterate through entire docBuffer.

So we can either pass in size e.g.

sub.collect(docBuffer, cnt, upToBucketIndex);

Or create a new array, which I think will be sub optimal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I realized this yesterday. Missed adding a comment. Due to this issue advanceExact might be getting invoked for target > doc resulting in an error

count += cnt;
}
aggregator.incrementBucketDocCount(upToBucketIndex, count);
}
} else {
stream.forEach(upToExclusive, doc -> collect(doc, owningBucketOrd));
}

if (stream.mayHaveRemaining()) {
advanceSkipper(upToExclusive, owningBucketOrd);
} else {
break;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.BitDocIdSet;
import org.apache.lucene.util.FixedBitSet;
import org.opensearch.search.aggregations.BitSetDocIdStream;
import org.opensearch.search.aggregations.BucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext;
Expand All @@ -23,8 +24,6 @@
import java.util.function.BiConsumer;
import java.util.function.Function;

import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;

/**
* Range collector implementation that supports sub-aggregations by collecting doc IDs.
*/
Expand Down Expand Up @@ -85,10 +84,7 @@ public void finalizePreviousRange() {
DocIdSetIterator iterator = bitDocIdSet.iterator();
// build a new leaf collector for each bucket
LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(leafCtx);
while (iterator.nextDoc() != NO_MORE_DOCS) {
int currentDoc = iterator.docID();
sub.collect(currentDoc, bucketOrd);
}
sub.collect(new BitSetDocIdStream(bitSet, 0), bucketOrd);
logger.trace("collected sub aggregation for bucket {}", bucketOrd);
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@

package org.opensearch.search.aggregations.bucket.histogram;

import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.DocValuesSkipper;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.CollectionUtil;
Expand All @@ -51,6 +54,7 @@
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.bucket.DeferableBucketAggregator;
import org.opensearch.search.aggregations.bucket.DeferringBucketCollector;
import org.opensearch.search.aggregations.bucket.HistogramSkiplistLeafCollector;
import org.opensearch.search.aggregations.bucket.MergingBucketsDeferringCollector;
import org.opensearch.search.aggregations.bucket.filterrewrite.DateHistogramAggregatorBridge;
import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext;
Expand Down Expand Up @@ -135,6 +139,7 @@ static AutoDateHistogramAggregator build(
protected int roundingIdx;
protected Rounding.Prepared preparedRounding;

private final String fieldName;
private final FilterRewriteOptimizationContext filterRewriteOptimizationContext;

private AutoDateHistogramAggregator(
Expand Down Expand Up @@ -218,6 +223,10 @@ protected Function<Long, Long> bucketOrdProducer() {
return (key) -> getBucketOrds().add(0, preparedRounding.round(key));
}
};

this.fieldName = (valuesSource instanceof ValuesSource.Numeric.FieldData)
? ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName()
: null;
filterRewriteOptimizationContext = new FilterRewriteOptimizationContext(bridge, parent, subAggregators.length, context);
}

Expand Down Expand Up @@ -260,7 +269,21 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc
return LeafBucketCollector.NO_OP_COLLECTOR;
}

DocValuesSkipper skipper = null;
if (this.fieldName != null) {
skipper = ctx.reader().getDocValuesSkipper(this.fieldName);
}
final SortedNumericDocValues values = valuesSource.longValues(ctx);
final NumericDocValues singleton = DocValues.unwrapSingleton(values);

if (skipper != null && singleton != null) {
// TODO: add hard bounds support
// TODO: SkipListLeafCollector should be used if the getLeafCollector invocation is from
// filterRewriteOptimizationContext when parent != null. Removing the check to collect
// performance numbers for now
return new HistogramSkiplistLeafCollector(singleton, skipper, preparedRounding, getBucketOrds(), sub, this);
}

final LeafBucketCollector iteratingCollector = getLeafCollector(values, sub);
return new LeafBucketCollectorBase(sub, values) {
@Override
Expand Down
Loading
Loading