-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Theta Sketch Aggregation Enhancements #12042
Changes from 11 commits
97826ed
4fd6650
6e45abc
44db64e
2ed38f3
3a9e21a
0ffffaa
d177213
4e78fc6
595c170
582ded3
90cde3a
90d4562
bebf993
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -261,6 +261,11 @@ public static Sketch thetaSketchDiff(Object sketchObjectA, Object sketchObjectB) | |
return diff.getResult(false, null, false); | ||
} | ||
|
||
@ScalarFunction(names = {"thetaSketchToString", "theta_sketch_to_string"}) | ||
public static String thetaSketchToString(Object sketchObject) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do we return for this function? Can we add some examples for commit notes? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function returns a string which is a summary of the current sketch state. Note, the function name is consistent with the same name as that in Druid. See this link. Examples include:
and:
These allow the end user to inspect whether a sketch is in estimation mode, or whether the sketch is ordered, or even the state of the sketch - whether undateable or compact. Where is the best place to include these examples for the commit notes? |
||
return asThetaSketch(sketchObject).toString(); | ||
} | ||
|
||
private static Sketch thetaSketchUnionVar(Object... sketchObjects) { | ||
Union union = SET_OPERATION_BUILDER.buildUnion(); | ||
for (Object sketchObj : sketchObjects) { | ||
|
@@ -417,6 +422,11 @@ public static byte[] cpcSketchUnion(Object o1, Object o2, Object o3, Object o4, | |
return cpcSketchUnionVar(o1, o2, o3, o4, o5); | ||
} | ||
|
||
@ScalarFunction(names = {"cpcSketchToString", "cpc_sketch_to_string"}) | ||
public static String cpcSketchToString(Object sketchObject) { | ||
return asCpcSketch(sketchObject).toString(); | ||
} | ||
|
||
/** | ||
* Create a CPC Sketch containing the input, with a configured nominal entries | ||
* | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,11 +18,13 @@ | |
*/ | ||
package org.apache.pinot.core.query.aggregation.function; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Base64; | ||
import java.util.List; | ||
import org.apache.datasketches.theta.Sketch; | ||
import org.apache.pinot.common.request.context.ExpressionContext; | ||
import org.apache.pinot.common.utils.DataSchema.ColumnDataType; | ||
import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator; | ||
import org.apache.pinot.segment.spi.AggregationFunctionType; | ||
|
||
|
||
|
@@ -47,11 +49,18 @@ public ColumnDataType getFinalResultColumnType() { | |
} | ||
|
||
@Override | ||
public String extractFinalResult(List<Sketch> sketches) { | ||
Sketch sketch = evaluatePostAggregationExpression(sketches); | ||
public String extractFinalResult(List<ThetaSketchAccumulator> accumulators) { | ||
int numAccumulators = accumulators.size(); | ||
List<Sketch> mergedSketches = new ArrayList<>(numAccumulators); | ||
|
||
// NOTE: Compact the sketch in unsorted, on-heap fashion for performance concern. | ||
// See https://datasketches.apache.org/docs/Theta/ThetaSize.html for more details. | ||
return Base64.getEncoder().encodeToString(sketch.compact(false, null).toByteArray()); | ||
for (ThetaSketchAccumulator accumulator : accumulators) { | ||
accumulator.setOrdered(_intermediateOrdering); | ||
accumulator.setThreshold(_accumulatorThreshold); | ||
accumulator.setSetOperationBuilder(_setOperationBuilder); | ||
mergedSketches.add(accumulator.getResult()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What would happen in the default case ? I assume it would just do union a for every sketch pair, instead of the intermediate batching (default threshold = 2). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is the default behaviour - I believe that this matches the existing implementation. |
||
} | ||
|
||
Sketch sketch = evaluatePostAggregationExpression(mergedSketches); | ||
return Base64.getEncoder().encodeToString(sketch.toByteArray()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these new functions added to take in the new parameters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, these are useful for debugging purposes. They can be used to inspect the preamble flags for sorting and compaction for sketches in storage, or, for sketches that are returned by the aggregation function.