-
Notifications
You must be signed in to change notification settings - Fork 1.5k
FILTER Clauses for Aggregates #7916
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
587cc19
51731ad
c95646c
fd20115
ad49238
4debd68
55d1760
5ae01e5
99d2a6f
6e73cda
28a878b
b453e8f
69c7538
d047539
d17fd56
c89b2f5
8a58f34
aad465d
d0a6519
e6661ad
5612829
5b67243
e77bcce
a03a6eb
42b274e
5c59647
66a76ac
ee3a862
920e3e1
5878c79
f5f46ea
0df5f53
6dd3419
83928ce
333dd40
e453a75
1ed9fa1
6f05dc5
861b00d
77aaaad
a86215e
c6cbf82
821c874
1863817
60aafbb
29b0e62
8f621ad
7c940f1
c6aa1af
69604de
f1402b0
3ac9b7c
f2f27db
a9790fc
de9cd91
e0ec541
da86331
01249a7
2817e89
7befd41
52cd305
33761d3
b71aa41
f8e22df
7861b40
0aa8e79
e90a072
ae0d806
fe702fe
d057729
e1befb1
037fb1b
c21dc10
c9a1a5b
365182a
3edd185
528c905
81b99c6
dc6af78
951f85f
85e1188
970b1a0
be3c695
7d85659
f5fbe1a
6d0d3ad
ec5b19c
f6ae46b
c8d9ad5
bd19945
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.pinot.core.operator.docidsets; | ||
|
|
||
| import org.apache.pinot.core.operator.dociditerators.RangelessBitmapDocIdIterator; | ||
| import org.roaringbitmap.buffer.ImmutableRoaringBitmap; | ||
|
|
||
|
|
||
| public class RangelessBitmapDocIdSet implements FilterBlockDocIdSet { | ||
| private final RangelessBitmapDocIdIterator _iterator; | ||
|
|
||
| public RangelessBitmapDocIdSet(ImmutableRoaringBitmap docIds) { | ||
| _iterator = new RangelessBitmapDocIdIterator(docIds); | ||
| } | ||
|
|
||
| public RangelessBitmapDocIdSet(RangelessBitmapDocIdIterator iterator) { | ||
| _iterator = iterator; | ||
| } | ||
|
|
||
| @Override | ||
| public RangelessBitmapDocIdIterator iterator() { | ||
| return _iterator; | ||
| } | ||
|
|
||
| @Override | ||
| public long getNumEntriesScannedInFilter() { | ||
| return 0L; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.pinot.core.operator.filter; | ||
|
|
||
| import java.util.Arrays; | ||
| import java.util.List; | ||
| import org.apache.pinot.core.common.Operator; | ||
| import org.apache.pinot.core.operator.blocks.FilterBlock; | ||
| import org.apache.pinot.core.operator.docidsets.AndDocIdSet; | ||
| import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet; | ||
|
|
||
|
|
||
| /** | ||
| * A combined filter operator consisting of one main filter operator and one sub filter operator. The result block is | ||
| * the AND result of the main and sub filter. | ||
| */ | ||
| public class CombinedFilterOperator extends BaseFilterOperator { | ||
| private static final String OPERATOR_NAME = "CombinedFilterOperator"; | ||
| private static final String EXPLAIN_NAME = "FILTER_COMBINED"; | ||
|
|
||
| private final BaseFilterOperator _mainFilterOperator; | ||
| private final BaseFilterOperator _subFilterOperator; | ||
|
|
||
| public CombinedFilterOperator(BaseFilterOperator mainFilterOperator, BaseFilterOperator subFilterOperator) { | ||
| _mainFilterOperator = mainFilterOperator; | ||
| _subFilterOperator = subFilterOperator; | ||
| } | ||
|
|
||
| @Override | ||
| public String getOperatorName() { | ||
| return OPERATOR_NAME; | ||
| } | ||
|
|
||
| @Override | ||
| public List<Operator> getChildOperators() { | ||
| return Arrays.asList(_mainFilterOperator, _subFilterOperator); | ||
| } | ||
|
|
||
| @Override | ||
| public String toExplainString() { | ||
| return EXPLAIN_NAME; | ||
| } | ||
|
|
||
| @Override | ||
| protected FilterBlock getNextBlock() { | ||
| FilterBlockDocIdSet mainFilterDocIdSet = _mainFilterOperator.nextBlock().getNonScanFilterBLockDocIdSet(); | ||
| FilterBlockDocIdSet subFilterDocIdSet = _subFilterOperator.nextBlock().getBlockDocIdSet(); | ||
| return new FilterBlock(new AndDocIdSet(Arrays.asList(mainFilterDocIdSet, subFilterDocIdSet))); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,116 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.pinot.core.operator.query; | ||
|
|
||
| import java.util.Arrays; | ||
| import java.util.IdentityHashMap; | ||
| import java.util.List; | ||
| import java.util.stream.Collectors; | ||
| import org.apache.commons.lang3.tuple.Pair; | ||
| import org.apache.pinot.core.common.Operator; | ||
| import org.apache.pinot.core.operator.BaseOperator; | ||
| import org.apache.pinot.core.operator.ExecutionStatistics; | ||
| import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; | ||
| import org.apache.pinot.core.operator.blocks.TransformBlock; | ||
| import org.apache.pinot.core.operator.transform.TransformOperator; | ||
| import org.apache.pinot.core.query.aggregation.AggregationExecutor; | ||
| import org.apache.pinot.core.query.aggregation.DefaultAggregationExecutor; | ||
| import org.apache.pinot.core.query.aggregation.function.AggregationFunction; | ||
|
|
||
|
|
||
| /** | ||
| * This operator processes a collection of filtered (and potentially non filtered) aggregations. | ||
| * | ||
| * For a query with either all aggregations being filtered or a mix of filtered and non filtered aggregations, | ||
| * FilteredAggregationOperator will come into execution. | ||
| */ | ||
| @SuppressWarnings("rawtypes") | ||
| public class FilteredAggregationOperator extends BaseOperator<IntermediateResultsBlock> { | ||
| private static final String OPERATOR_NAME = "FilteredAggregationOperator"; | ||
| private static final String EXPLAIN_NAME = "AGGREGATE_FILTERED"; | ||
|
|
||
| private final AggregationFunction[] _aggregationFunctions; | ||
| private final List<Pair<AggregationFunction[], TransformOperator>> _aggFunctionsWithTransformOperator; | ||
| private final long _numTotalDocs; | ||
|
|
||
| private long _numDocsScanned; | ||
| private long _numEntriesScannedInFilter; | ||
| private long _numEntriesScannedPostFilter; | ||
|
|
||
| // We can potentially do away with aggregationFunctions parameter, but its cleaner to pass it in than to construct | ||
| // it from aggFunctionsWithTransformOperator | ||
| public FilteredAggregationOperator(AggregationFunction[] aggregationFunctions, | ||
| List<Pair<AggregationFunction[], TransformOperator>> aggFunctionsWithTransformOperator, long numTotalDocs) { | ||
| _aggregationFunctions = aggregationFunctions; | ||
| _aggFunctionsWithTransformOperator = aggFunctionsWithTransformOperator; | ||
| _numTotalDocs = numTotalDocs; | ||
| } | ||
|
|
||
| @Override | ||
| protected IntermediateResultsBlock getNextBlock() { | ||
| int numAggregations = _aggregationFunctions.length; | ||
| Object[] result = new Object[numAggregations]; | ||
| IdentityHashMap<AggregationFunction, Integer> resultIndexMap = new IdentityHashMap<>(numAggregations); | ||
| for (int i = 0; i < numAggregations; i++) { | ||
| resultIndexMap.put(_aggregationFunctions[i], i); | ||
| } | ||
|
|
||
| for (Pair<AggregationFunction[], TransformOperator> filteredAggregation : _aggFunctionsWithTransformOperator) { | ||
| AggregationFunction[] aggregationFunctions = filteredAggregation.getLeft(); | ||
| AggregationExecutor aggregationExecutor = new DefaultAggregationExecutor(aggregationFunctions); | ||
| TransformOperator transformOperator = filteredAggregation.getRight(); | ||
| TransformBlock transformBlock; | ||
| int numDocsScanned = 0; | ||
| while ((transformBlock = transformOperator.nextBlock()) != null) { | ||
| aggregationExecutor.aggregate(transformBlock); | ||
| numDocsScanned += transformBlock.getNumDocs(); | ||
| } | ||
| List<Object> filteredResult = aggregationExecutor.getResult(); | ||
|
|
||
| for (int i = 0; i < aggregationFunctions.length; i++) { | ||
| result[resultIndexMap.get(aggregationFunctions[i])] = filteredResult.get(i); | ||
| } | ||
| _numDocsScanned += numDocsScanned; | ||
| _numEntriesScannedInFilter += transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter(); | ||
| _numEntriesScannedPostFilter += (long) numDocsScanned * transformOperator.getNumColumnsProjected(); | ||
| } | ||
| return new IntermediateResultsBlock(_aggregationFunctions, Arrays.asList(result), false); | ||
| } | ||
|
|
||
| @Override | ||
| public String getOperatorName() { | ||
| return OPERATOR_NAME; | ||
| } | ||
|
|
||
| @Override | ||
| public List<Operator> getChildOperators() { | ||
| return _aggFunctionsWithTransformOperator.stream().map(Pair::getRight).collect(Collectors.toList()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unless this has recently changed, I think stream api usage is not consistent with Pinot coding convention.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I haven't seen a coding convention mentioning the same, yet. Is this documented somewhere?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm unaware of such a convention and have seen plenty of code using the streams API for performance non-critical operations (like this one) recently.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Jackie-Jiang Can you please clarify if stream api usage applies?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should avoid using stream api for performance critical operations. This one is at query path, but might not be that performance critical (only called once). Using regular api could give slightly better performance, but IMO both way is okay
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with @richardstartin -- we only need to worry about streams when the code is invoked in a tight loop for multiple iterations -- none of which is applicable in this specific case |
||
| } | ||
|
|
||
| @Override | ||
| public ExecutionStatistics getExecutionStatistics() { | ||
| return new ExecutionStatistics(_numDocsScanned, _numEntriesScannedInFilter, _numEntriesScannedPostFilter, | ||
| _numTotalDocs); | ||
| } | ||
|
|
||
| @Override | ||
| public String toExplainString() { | ||
| return EXPLAIN_NAME; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.