Skip to content

Commit

Permalink
Attempt 2: Simplify implementation
Browse files Browse the repository at this point in the history
Removes intermediate array list to buffer/accumulate sketch
elements.  Instead, inputs are fed directly to the underlying
union.  This ensures that the memory usage of the merge is
kept under control.
  • Loading branch information
davecromberge committed Nov 24, 2023
1 parent 44db64e commit 2ed38f3
Show file tree
Hide file tree
Showing 9 changed files with 314 additions and 371 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
import org.apache.pinot.segment.local.customobject.QuantileDigest;
import org.apache.pinot.segment.local.customobject.StringLongPair;
import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
import org.apache.pinot.segment.local.customobject.ThetaUnionWrap;
import org.apache.pinot.segment.local.customobject.VarianceTuple;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.spi.utils.BigDecimalUtils;
Expand Down Expand Up @@ -156,7 +156,7 @@ public enum ObjectType {
FloatArrayList(44),
StringArrayList(45),
UltraLogLog(46),
ThetaSketchAccumulator(47);
ThetaUnionWrap(47);

private final int _value;

Expand Down Expand Up @@ -275,8 +275,8 @@ public static ObjectType getObjectType(Object value) {
return ObjectType.CompressedProbabilisticCounting;
} else if (value instanceof UltraLogLog) {
return ObjectType.UltraLogLog;
} else if (value instanceof ThetaSketchAccumulator) {
return ObjectType.ThetaSketchAccumulator;
} else if (value instanceof ThetaUnionWrap) {
return ObjectType.ThetaUnionWrap;
} else {
throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName());
}
Expand Down Expand Up @@ -1590,30 +1590,21 @@ public UltraLogLog deserialize(ByteBuffer byteBuffer) {
}
};

public static final ObjectSerDe<ThetaSketchAccumulator> DATA_SKETCH_SKETCH_ACCUMULATOR_SER_DE =
new ObjectSerDe<ThetaSketchAccumulator>() {
public static final ObjectSerDe<ThetaUnionWrap> DATA_SKETCH_THETA_UNION_WRAP_SER_DE =
new ObjectSerDe<ThetaUnionWrap>() {

@Override
public byte[] serialize(ThetaSketchAccumulator thetaSketchBuffer) {
Sketch sketch = thetaSketchBuffer.getResult();
return sketch.toByteArray();
public byte[] serialize(ThetaUnionWrap thetaUnionWrap) {
return thetaUnionWrap.toBytes();
}

@Override
public ThetaSketchAccumulator deserialize(byte[] bytes) {
return deserialize(ByteBuffer.wrap(bytes));
public ThetaUnionWrap deserialize(byte[] bytes) {
return ThetaUnionWrap.fromBytes(bytes);
}

// Note: The accumulator is designed to serialize as a sketch and should
// not be deserialized in practice.
@Override
public ThetaSketchAccumulator deserialize(ByteBuffer byteBuffer) {
ThetaSketchAccumulator thetaSketchAccumulator = new ThetaSketchAccumulator();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
Sketch sketch = Sketch.wrap(Memory.wrap(bytes));
thetaSketchAccumulator.apply(sketch);
return thetaSketchAccumulator;
public ThetaUnionWrap deserialize(ByteBuffer byteBuffer) {
return ThetaUnionWrap.fromByteBuffer(byteBuffer);
}
};

Expand Down Expand Up @@ -1667,7 +1658,7 @@ public ThetaSketchAccumulator deserialize(ByteBuffer byteBuffer) {
FLOAT_ARRAY_LIST_SER_DE,
STRING_ARRAY_LIST_SER_DE,
ULTRA_LOG_LOG_OBJECT_SER_DE,
DATA_SKETCH_SKETCH_ACCUMULATOR_SER_DE,
DATA_SKETCH_THETA_UNION_WRAP_SER_DE,
};
//@formatter:on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.datasketches.theta.Sketch;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
import org.apache.pinot.segment.local.customobject.ThetaUnionWrap;
import org.apache.pinot.segment.spi.AggregationFunctionType;


Expand All @@ -49,15 +49,12 @@ public ColumnDataType getFinalResultColumnType() {
}

@Override
public String extractFinalResult(List<ThetaSketchAccumulator> accumulators) {
int numAccumulators = accumulators.size();
List<Sketch> mergedSketches = new ArrayList<>(numAccumulators);
public String extractFinalResult(List<ThetaUnionWrap> unionWraps) {
int numUnionWraps = unionWraps.size();
List<Sketch> mergedSketches = new ArrayList<>(numUnionWraps);

for (ThetaSketchAccumulator accumulator : accumulators) {
accumulator.setOrdered(_intermediateOrdering);
accumulator.setThreshold(_accumulatorThreshold);
accumulator.setSetOperationBuilder(_setOperationBuilder);
mergedSketches.add(accumulator.getResult());
for (ThetaUnionWrap unionWrap : unionWraps) {
mergedSketches.add(unionWrap.getResult());
}

Sketch sketch = evaluatePostAggregationExpression(mergedSketches);
Expand Down
Loading

0 comments on commit 2ed38f3

Please sign in to comment.