Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Theta Sketch Aggregation Enhancements #12042

Merged
merged 14 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
import org.apache.pinot.segment.local.customobject.QuantileDigest;
import org.apache.pinot.segment.local.customobject.StringLongPair;
import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
import org.apache.pinot.segment.local.customobject.VarianceTuple;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.spi.utils.BigDecimalUtils;
Expand Down Expand Up @@ -154,7 +155,8 @@ public enum ObjectType {
LongArrayList(43),
FloatArrayList(44),
StringArrayList(45),
UltraLogLog(46);
UltraLogLog(46),
ThetaSketchAccumulator(47);

private final int _value;

Expand Down Expand Up @@ -273,6 +275,8 @@ public static ObjectType getObjectType(Object value) {
return ObjectType.CompressedProbabilisticCounting;
} else if (value instanceof UltraLogLog) {
return ObjectType.UltraLogLog;
} else if (value instanceof ThetaSketchAccumulator) {
return ObjectType.ThetaSketchAccumulator;
} else {
throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName());
}
Expand Down Expand Up @@ -1125,9 +1129,15 @@ public TDigest deserialize(ByteBuffer byteBuffer) {

@Override
public byte[] serialize(Sketch value) {
// NOTE: Compact the sketch in unsorted, on-heap fashion for performance concern.
// See https://datasketches.apache.org/docs/Theta/ThetaSize.html for more details.
return value.compact(false, null).toByteArray();
// The serializer should respect existing ordering to enable "early stop"
// optimisations on unions.
boolean shouldCompact = !value.isCompact();
boolean shouldOrder = value.isOrdered();

if (shouldCompact) {
return value.compact(shouldOrder, null).toByteArray();
}
return value.toByteArray();
}

@Override
Expand Down Expand Up @@ -1580,6 +1590,33 @@ public UltraLogLog deserialize(ByteBuffer byteBuffer) {
}
};

public static final ObjectSerDe<ThetaSketchAccumulator> DATA_SKETCH_SKETCH_ACCUMULATOR_SER_DE =
new ObjectSerDe<ThetaSketchAccumulator>() {

@Override
public byte[] serialize(ThetaSketchAccumulator thetaSketchBuffer) {
Sketch sketch = thetaSketchBuffer.getResult();
return sketch.toByteArray();
}

@Override
public ThetaSketchAccumulator deserialize(byte[] bytes) {
return deserialize(ByteBuffer.wrap(bytes));
}

// Note: The accumulator is designed to serialize as a sketch and should
// not be deserialized in practice.
@Override
public ThetaSketchAccumulator deserialize(ByteBuffer byteBuffer) {
ThetaSketchAccumulator thetaSketchAccumulator = new ThetaSketchAccumulator();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
Sketch sketch = Sketch.wrap(Memory.wrap(bytes));
thetaSketchAccumulator.apply(sketch);
return thetaSketchAccumulator;
}
};

// NOTE: DO NOT change the order, it has to be the same order as the ObjectType
//@formatter:off
private static final ObjectSerDe[] SER_DES = {
Expand Down Expand Up @@ -1630,6 +1667,7 @@ public UltraLogLog deserialize(ByteBuffer byteBuffer) {
FLOAT_ARRAY_LIST_SER_DE,
STRING_ARRAY_LIST_SER_DE,
ULTRA_LOG_LOG_OBJECT_SER_DE,
DATA_SKETCH_SKETCH_ACCUMULATOR_SER_DE,
};
//@formatter:on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,11 @@ public static Sketch thetaSketchDiff(Object sketchObjectA, Object sketchObjectB)
return diff.getResult(false, null, false);
}

@ScalarFunction(names = {"thetaSketchToString", "theta_sketch_to_string"})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these new functions added to take in the new parameters?

Copy link
Member Author

@davecromberge davecromberge Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, these are useful for debugging purposes. They can be used to inspect the preamble flags for sorting and compaction for sketches in storage, or, for sketches that are returned by the aggregation function.

public static String thetaSketchToString(Object sketchObject) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we return for this function? Can we add some examples for commit notes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function returns a string which is a summary of the current sketch state. Note, the function name is consistent with the same name as that in Druid. See this link.

Examples include:

### HeapCompactOrderedSketch SUMMARY: 
   Estimate                : 149586.73149344584
   Upper Bound, 95% conf   : 154287.5017892762
   Lower Bound, 95% conf   : 145028.6046846571
   Theta (double)          : 0.027382107751846067
   Theta (long)            : 252555366948521403
   Theta (long) hex        : 038141c4a515c5bb
   EstMode?                : true
   Empty?                  : false
   Array Size Entries      : 4096
   Retained Entries        : 4096
   Seed Hash               : 93cc
### END SKETCH SUMMARY

and:

### HeapCompactOrderedSketch SUMMARY: 
   Estimate                : 48249.113729035394
   Upper Bound, 95% conf   : 50358.736970106176
   Lower Bound, 95% conf   : 46227.35737896924
   Theta (double)          : 0.04377282475820978
   Theta (long)            : 403733047849016500
   Theta (long) hex        : 059a591165205cb4
   EstMode?                : true
   Empty?                  : false
   Array Size Entries      : 2112
   Retained Entries        : 2112
   Seed Hash               : 93cc
### END SKETCH SUMMARY

These allow the end user to inspect whether a sketch is in estimation mode, or whether the sketch is ordered, or even the state of the sketch - whether undateable or compact.

Where is the best place to include these examples for the commit notes?

return asThetaSketch(sketchObject).toString();
}

private static Sketch thetaSketchUnionVar(Object... sketchObjects) {
Union union = SET_OPERATION_BUILDER.buildUnion();
for (Object sketchObj : sketchObjects) {
Expand Down Expand Up @@ -417,6 +422,11 @@ public static byte[] cpcSketchUnion(Object o1, Object o2, Object o3, Object o4,
return cpcSketchUnionVar(o1, o2, o3, o4, o5);
}

@ScalarFunction(names = {"cpcSketchToString", "cpc_sketch_to_string"})
public static String cpcSketchToString(Object sketchObject) {
return asCpcSketch(sketchObject).toString();
}

/**
* Create a CPC Sketch containing the input, with a configured nominal entries
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
*/
package org.apache.pinot.core.query.aggregation.function;

import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import org.apache.datasketches.theta.Sketch;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
import org.apache.pinot.segment.spi.AggregationFunctionType;


Expand All @@ -47,11 +49,18 @@ public ColumnDataType getFinalResultColumnType() {
}

@Override
public String extractFinalResult(List<Sketch> sketches) {
Sketch sketch = evaluatePostAggregationExpression(sketches);
public String extractFinalResult(List<ThetaSketchAccumulator> accumulators) {
int numAccumulators = accumulators.size();
List<Sketch> mergedSketches = new ArrayList<>(numAccumulators);

// NOTE: Compact the sketch in unsorted, on-heap fashion for performance concern.
// See https://datasketches.apache.org/docs/Theta/ThetaSize.html for more details.
return Base64.getEncoder().encodeToString(sketch.compact(false, null).toByteArray());
for (ThetaSketchAccumulator accumulator : accumulators) {
accumulator.setOrdered(_intermediateOrdering);
accumulator.setThreshold(_accumulatorThreshold);
accumulator.setSetOperationBuilder(_setOperationBuilder);
mergedSketches.add(accumulator.getResult());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen in the default case ? I assume it would just do union a for every sketch pair, instead of the intermediate batching (default threshold = 2).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the default behaviour - I believe that this matches the existing implementation.

}

Sketch sketch = evaluatePostAggregationExpression(mergedSketches);
return Base64.getEncoder().encodeToString(sketch.toByteArray());
}
}
Loading
Loading