Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
587cc19
Intermediate Commit
atris Oct 15, 2021
51731ad
more stuff
Oct 25, 2021
c95646c
Intermediate
Oct 26, 2021
fd20115
Add pipeline operator and family
Oct 27, 2021
ad49238
More stuff
Oct 28, 2021
4debd68
Intermediate Commit
atris Nov 10, 2021
55d1760
Remove remanants
atris Nov 10, 2021
5ae01e5
Revert plannode changes
atris Nov 10, 2021
99d2a6f
Add visitor
atris Nov 10, 2021
6e73cda
First cut at wrapped plan
atris Nov 11, 2021
28a878b
Merge master
atris Dec 2, 2021
b453e8f
Revert "Add visitor"
atris Nov 11, 2021
69c7538
Intermediate Commit
atris Nov 11, 2021
d047539
Get multi function working
atris Nov 11, 2021
d17fd56
Add Filtered Funcs
atris Nov 12, 2021
c89b2f5
Add Filterable Aggregation
atris Nov 12, 2021
8a58f34
Remove redundant changes
atris Nov 12, 2021
aad465d
Initial Wiring For Non Filtered Aggs
atris Nov 15, 2021
d0a6519
And More Stuff
atris Nov 15, 2021
e6661ad
Get v1 working
atris Nov 16, 2021
5612829
Allow indices to be used for filter clauses
atris Nov 17, 2021
5b67243
Add tests and more stuff
atris Nov 17, 2021
e77bcce
Fix booboo
atris Nov 17, 2021
a03a6eb
Round1 Complete
atris Nov 24, 2021
42b274e
Revert unnecessary changes
atris Nov 24, 2021
5c59647
Remove redundant comment
atris Nov 24, 2021
66a76ac
Cleanup
atris Nov 25, 2021
ee3a862
More cleanup
atris Nov 25, 2021
920e3e1
Checkstyle fixes
atris Nov 25, 2021
5878c79
More checkstyle
atris Nov 25, 2021
f5f46ea
Fix booboo
atris Nov 25, 2021
0df5f53
And more booboo
atris Nov 25, 2021
6dd3419
grrrr
atris Nov 25, 2021
83928ce
Fix test failures, wrong tests and add more tests
atris Nov 30, 2021
333dd40
And add more tests
atris Nov 30, 2021
e453a75
Add benchmark
atris Nov 30, 2021
1ed9fa1
More fixes
atris Dec 1, 2021
6f05dc5
multi block
atris Dec 2, 2021
861b00d
Modify benchmark
atris Dec 2, 2021
77aaaad
Fix checkstyle
atris Dec 2, 2021
a86215e
Fix spacing
atris Dec 3, 2021
c6cbf82
Add missing methods
atris Dec 3, 2021
821c874
Update docidset usage
atris Dec 3, 2021
1863817
remove redundant import
atris Dec 3, 2021
60aafbb
And more checkstyles
atris Dec 3, 2021
29b0e62
Update test
atris Dec 3, 2021
8f621ad
Fix more style
atris Dec 3, 2021
7c940f1
And more fixes
atris Dec 3, 2021
c6aa1af
Removed FilterableAggregation class
atris Dec 7, 2021
69604de
Add necessary classes for filter level split
atris Dec 10, 2021
f1402b0
Move towards approach 2
atris Dec 13, 2021
3ac9b7c
Get all cases working
atris Dec 14, 2021
f2f27db
Cleanup and more tests
atris Dec 15, 2021
a9790fc
More cleanup and fixes
atris Dec 15, 2021
de9cd91
Make non filtered predicate be called only once
atris Dec 16, 2021
e0ec541
Cleanup
atris Dec 16, 2021
da86331
More cleanup
atris Dec 16, 2021
01249a7
Add licenses
atris Dec 16, 2021
2817e89
Fix checkstyle
atris Dec 16, 2021
7befd41
Fix booboo
atris Dec 16, 2021
52cd305
Remove a stupidity
atris Dec 16, 2021
33761d3
Refactor tests
atris Dec 16, 2021
b71aa41
Fix checkstyle
atris Dec 16, 2021
f8e22df
Avoid null blocks
atris Dec 16, 2021
7861b40
More review comments
atris Dec 16, 2021
0aa8e79
More review comments
atris Dec 16, 2021
e90a072
Fix checkstyle
atris Dec 16, 2021
ae0d806
Grr
atris Dec 16, 2021
fe702fe
Rename existing to shared operator
atris Jan 4, 2022
d057729
Aggregation Filter
Jackie-Jiang Dec 30, 2021
e1befb1
Refactor, integrate and fix
atris Jan 5, 2022
037fb1b
More cleanup
atris Jan 5, 2022
c21dc10
Cathratic removal of redundant code
atris Jan 5, 2022
c9a1a5b
Remove redundant test
atris Jan 5, 2022
365182a
Javadocs
atris Jan 5, 2022
3edd185
More cleanup
atris Jan 5, 2022
528c905
Linter
atris Jan 5, 2022
81b99c6
Fix cast
atris Jan 5, 2022
dc6af78
Fix checkstyle
atris Jan 5, 2022
951f85f
Address per comments
atris Jan 17, 2022
85e1188
Revert unintended change
atris Jan 17, 2022
970b1a0
More fixes per comments
atris Jan 18, 2022
be3c695
Refactoring and removing FilterableAggregationFunction
atris Jan 19, 2022
7d85659
More cleanup
atris Jan 19, 2022
f5fbe1a
More comments
atris Jan 28, 2022
6d0d3ad
Merge remote-tracking branch 'main_origin/master' into hack_hack_filt…
atris Jan 28, 2022
ec5b19c
Update tests
atris Jan 28, 2022
f6ae46b
Fix missing comments
atris Jan 28, 2022
c8d9ad5
Add missing update in name change
atris Jan 28, 2022
bd19945
Precommit fixes
atris Jan 31, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,25 @@
*/
public class FilterBlock implements Block {
private final FilterBlockDocIdSet _filterBlockDocIdSet;
private FilterBlockDocIdSet _nonScanFilterBlockDocIdSet;

public FilterBlock(FilterBlockDocIdSet filterBlockDocIdSet) {
_filterBlockDocIdSet = filterBlockDocIdSet;
}

/**
* Pre-scans the documents if needed, and returns a non-scan-based FilterBlockDocIdSet.
*/
public FilterBlockDocIdSet getNonScanFilterBLockDocIdSet() {
if (_nonScanFilterBlockDocIdSet == null) {
_nonScanFilterBlockDocIdSet = _filterBlockDocIdSet.toNonScanDocIdSet();
}
return _nonScanFilterBlockDocIdSet;
}

@Override
public FilterBlockDocIdSet getBlockDocIdSet() {
return _filterBlockDocIdSet;
return _nonScanFilterBlockDocIdSet != null ? _nonScanFilterBlockDocIdSet : _filterBlockDocIdSet;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,19 @@


public class BitmapDocIdSet implements FilterBlockDocIdSet {
private final ImmutableRoaringBitmap _docIds;
private final int _numDocs;
private final BitmapDocIdIterator _iterator;

public BitmapDocIdSet(ImmutableRoaringBitmap docIds, int numDocs) {
_docIds = docIds;
_numDocs = numDocs;
_iterator = new BitmapDocIdIterator(docIds, numDocs);
}

public BitmapDocIdSet(BitmapDocIdIterator iterator) {
_iterator = iterator;
}

@Override
public BitmapDocIdIterator iterator() {
return new BitmapDocIdIterator(_docIds, _numDocs);
return _iterator;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,16 @@
*/
package org.apache.pinot.core.operator.docidsets;

import org.apache.pinot.core.common.BlockDocIdIterator;
import org.apache.pinot.core.common.BlockDocIdSet;
import org.apache.pinot.core.operator.dociditerators.AndDocIdIterator;
import org.apache.pinot.core.operator.dociditerators.BitmapDocIdIterator;
import org.apache.pinot.core.operator.dociditerators.OrDocIdIterator;
import org.apache.pinot.core.operator.dociditerators.RangelessBitmapDocIdIterator;
import org.apache.pinot.core.operator.dociditerators.ScanBasedDocIdIterator;
import org.apache.pinot.segment.spi.Constants;
import org.roaringbitmap.RoaringBitmapWriter;
import org.roaringbitmap.buffer.MutableRoaringBitmap;


/**
Expand All @@ -32,4 +41,35 @@ public interface FilterBlockDocIdSet extends BlockDocIdSet {
* filtering phase. This method should be called after the filtering is done.
*/
long getNumEntriesScannedInFilter();

/**
* For scan-based FilterBlockDocIdSet, pre-scans the documents and returns a non-scan-based FilterBlockDocIdSet.
*/
default FilterBlockDocIdSet toNonScanDocIdSet() {
BlockDocIdIterator docIdIterator = iterator();

// NOTE: AND and OR DocIdIterator might contain scan-based DocIdIterator
// TODO: This scan is not counted in the execution stats
if (docIdIterator instanceof ScanBasedDocIdIterator || docIdIterator instanceof AndDocIdIterator
|| docIdIterator instanceof OrDocIdIterator) {
RoaringBitmapWriter<MutableRoaringBitmap> bitmapWriter =
RoaringBitmapWriter.bufferWriter().runCompress(false).get();
int docId;
while ((docId = docIdIterator.next()) != Constants.EOF) {
bitmapWriter.add(docId);
}
return new RangelessBitmapDocIdSet(bitmapWriter.get());
}

// NOTE: AND and OR DocIdSet might return BitmapBasedDocIdIterator after processing the iterators. Create a new
// DocIdSet to prevent processing the iterators again
if (docIdIterator instanceof RangelessBitmapDocIdIterator) {
return new RangelessBitmapDocIdSet((RangelessBitmapDocIdIterator) docIdIterator);
}
if (docIdIterator instanceof BitmapDocIdIterator) {
return new BitmapDocIdSet((BitmapDocIdIterator) docIdIterator);
}

return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.operator.docidsets;

import org.apache.pinot.core.operator.dociditerators.RangelessBitmapDocIdIterator;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;


public class RangelessBitmapDocIdSet implements FilterBlockDocIdSet {
private final RangelessBitmapDocIdIterator _iterator;

public RangelessBitmapDocIdSet(ImmutableRoaringBitmap docIds) {
_iterator = new RangelessBitmapDocIdIterator(docIds);
}

public RangelessBitmapDocIdSet(RangelessBitmapDocIdIterator iterator) {
_iterator = iterator;
}

@Override
public RangelessBitmapDocIdIterator iterator() {
return _iterator;
}

@Override
public long getNumEntriesScannedInFilter() {
return 0L;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.operator.filter;

import java.util.Arrays;
import java.util.List;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.blocks.FilterBlock;
import org.apache.pinot.core.operator.docidsets.AndDocIdSet;
import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet;


/**
* A combined filter operator consisting of one main filter operator and one sub filter operator. The result block is
* the AND result of the main and sub filter.
*/
public class CombinedFilterOperator extends BaseFilterOperator {
private static final String OPERATOR_NAME = "CombinedFilterOperator";
private static final String EXPLAIN_NAME = "FILTER_COMBINED";

private final BaseFilterOperator _mainFilterOperator;
private final BaseFilterOperator _subFilterOperator;

public CombinedFilterOperator(BaseFilterOperator mainFilterOperator, BaseFilterOperator subFilterOperator) {
_mainFilterOperator = mainFilterOperator;
_subFilterOperator = subFilterOperator;
}

@Override
public String getOperatorName() {
return OPERATOR_NAME;
}

@Override
public List<Operator> getChildOperators() {
return Arrays.asList(_mainFilterOperator, _subFilterOperator);
}

@Override
public String toExplainString() {
return EXPLAIN_NAME;
}

@Override
protected FilterBlock getNextBlock() {
FilterBlockDocIdSet mainFilterDocIdSet = _mainFilterOperator.nextBlock().getNonScanFilterBLockDocIdSet();
FilterBlockDocIdSet subFilterDocIdSet = _subFilterOperator.nextBlock().getBlockDocIdSet();
return new FilterBlock(new AndDocIdSet(Arrays.asList(mainFilterDocIdSet, subFilterDocIdSet)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.operator.query;

import java.util.Arrays;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.ExecutionStatistics;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.operator.blocks.TransformBlock;
import org.apache.pinot.core.operator.transform.TransformOperator;
import org.apache.pinot.core.query.aggregation.AggregationExecutor;
import org.apache.pinot.core.query.aggregation.DefaultAggregationExecutor;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;


/**
* This operator processes a collection of filtered (and potentially non filtered) aggregations.
*
* For a query with either all aggregations being filtered or a mix of filtered and non filtered aggregations,
* FilteredAggregationOperator will come into execution.
*/
@SuppressWarnings("rawtypes")
public class FilteredAggregationOperator extends BaseOperator<IntermediateResultsBlock> {
private static final String OPERATOR_NAME = "FilteredAggregationOperator";
private static final String EXPLAIN_NAME = "AGGREGATE_FILTERED";

private final AggregationFunction[] _aggregationFunctions;
private final List<Pair<AggregationFunction[], TransformOperator>> _aggFunctionsWithTransformOperator;
private final long _numTotalDocs;

private long _numDocsScanned;
private long _numEntriesScannedInFilter;
private long _numEntriesScannedPostFilter;

// We can potentially do away with aggregationFunctions parameter, but its cleaner to pass it in than to construct
// it from aggFunctionsWithTransformOperator
public FilteredAggregationOperator(AggregationFunction[] aggregationFunctions,
List<Pair<AggregationFunction[], TransformOperator>> aggFunctionsWithTransformOperator, long numTotalDocs) {
_aggregationFunctions = aggregationFunctions;
_aggFunctionsWithTransformOperator = aggFunctionsWithTransformOperator;
_numTotalDocs = numTotalDocs;
}

@Override
protected IntermediateResultsBlock getNextBlock() {
int numAggregations = _aggregationFunctions.length;
Object[] result = new Object[numAggregations];
IdentityHashMap<AggregationFunction, Integer> resultIndexMap = new IdentityHashMap<>(numAggregations);
for (int i = 0; i < numAggregations; i++) {
resultIndexMap.put(_aggregationFunctions[i], i);
}

for (Pair<AggregationFunction[], TransformOperator> filteredAggregation : _aggFunctionsWithTransformOperator) {
AggregationFunction[] aggregationFunctions = filteredAggregation.getLeft();
AggregationExecutor aggregationExecutor = new DefaultAggregationExecutor(aggregationFunctions);
TransformOperator transformOperator = filteredAggregation.getRight();
TransformBlock transformBlock;
int numDocsScanned = 0;
while ((transformBlock = transformOperator.nextBlock()) != null) {
aggregationExecutor.aggregate(transformBlock);
numDocsScanned += transformBlock.getNumDocs();
}
List<Object> filteredResult = aggregationExecutor.getResult();

for (int i = 0; i < aggregationFunctions.length; i++) {
result[resultIndexMap.get(aggregationFunctions[i])] = filteredResult.get(i);
}
_numDocsScanned += numDocsScanned;
_numEntriesScannedInFilter += transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
_numEntriesScannedPostFilter += (long) numDocsScanned * transformOperator.getNumColumnsProjected();
}
return new IntermediateResultsBlock(_aggregationFunctions, Arrays.asList(result), false);
}

@Override
public String getOperatorName() {
return OPERATOR_NAME;
}

@Override
public List<Operator> getChildOperators() {
return _aggFunctionsWithTransformOperator.stream().map(Pair::getRight).collect(Collectors.toList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless this has recently changed, I think stream api usage is not consistent with Pinot coding convention.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't seen a coding convention mentioning the same, yet. Is this documented somewhere?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unaware of such a convention and have seen plenty of code using the streams API for performance non-critical operations (like this one) recently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Jackie-Jiang Can you please clarify if stream api usage applies?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid using stream api for performance critical operations. This one is at query path, but might not be that performance critical (only called once). Using regular api could give slightly better performance, but IMO both way is okay

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @richardstartin -- we only need to worry about streams when the code is invoked in a tight loop for multiple iterations -- none of which is applicable in this specific case

}

@Override
public ExecutionStatistics getExecutionStatistics() {
return new ExecutionStatistics(_numDocsScanned, _numEntriesScannedInFilter, _numEntriesScannedPostFilter,
_numTotalDocs);
}

@Override
public String toExplainString() {
return EXPLAIN_NAME;
}
}
Loading