Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e2464e7
Add support for Datasketches Integer Tuple Sketches
andimiller Mar 15, 2023
af8a3b9
Merge branch 'master' into tuple-sketch-support
andimiller Mar 15, 2023
0eb23bb
fix style
andimiller Mar 15, 2023
a081c10
add test for sketch agg
andimiller Mar 15, 2023
9a3771c
fix mangled license headers
andimiller Mar 15, 2023
bb8054f
annotate types for old versions of java
andimiller Mar 15, 2023
1821a2c
Cache Tuple Union result so it's not recomputed
andimiller Mar 17, 2023
4dcffb3
Improve null handling in Tuple aggregation functions
andimiller Mar 17, 2023
b038bbc
Cleanup in IntegerTupleSketchAggregationFunction's parameters
andimiller Mar 17, 2023
3b41c5b
Make Theta and Tuple transform functions throw on unexpected key types
andimiller Mar 17, 2023
bb69257
Clean up sum/avg implementations for Tuple Sketch values
andimiller Mar 17, 2023
0c26150
Fix on Java 8
andimiller Mar 17, 2023
c99bb69
Merge branch 'master' into tuple-sketch-support
andimiller Apr 27, 2023
1207e88
Expand todo for tuple sketch aggregation function
andimiller Apr 27, 2023
d246b81
add preconditions to tuple aggregation function
andimiller Apr 27, 2023
0efea6e
empty commit to re-trigger CI
andimiller Apr 28, 2023
da958b8
empty commit to re-trigger CI again
andimiller Apr 28, 2023
798181a
Merge branch 'master' into tuple-sketch-support
andimiller May 10, 2023
3e13196
fix merge
andimiller May 10, 2023
b63d47e
empty commit to re-trigger CI again
andimiller May 11, 2023
c021c64
Merge branch 'master' into tuple-sketch-support
andimiller May 16, 2023
2381e4d
Merge branch 'master' into tuple-sketch-support
andimiller May 25, 2023
b44b0d9
fix merge
andimiller May 25, 2023
1b7fe74
fix merge again
andimiller May 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import org.apache.datasketches.kll.KllDoublesSketch;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.tuple.aninteger.IntegerSummary;
import org.apache.datasketches.tuple.aninteger.IntegerSummaryDeserializer;
import org.apache.pinot.common.CustomObject;
import org.apache.pinot.core.query.aggregation.utils.argminmax.ArgMinMaxObject;
import org.apache.pinot.core.query.distinct.DistinctTable;
Expand Down Expand Up @@ -131,7 +133,8 @@ public enum ObjectType {
VarianceTuple(33),
PinotFourthMoment(34),
ArgMinMaxObject(35),
KllDataSketch(36);
KllDataSketch(36),
IntegerTupleSketch(37);

private final int _value;

Expand Down Expand Up @@ -219,6 +222,8 @@ public static ObjectType getObjectType(Object value) {
return ObjectType.VarianceTuple;
} else if (value instanceof PinotFourthMoment) {
return ObjectType.PinotFourthMoment;
} else if (value instanceof org.apache.datasketches.tuple.Sketch) {
return ObjectType.IntegerTupleSketch;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a safe assumption? Is it also necessary to inspect the summary type to verify integer?

Copy link
Contributor Author

@andimiller andimiller Mar 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right now it is, but to add other types of tuple Sketch we'd need to add wrapper types, due to JVM type erasure

} else if (value instanceof ArgMinMaxObject) {
return ObjectType.ArgMinMaxObject;
} else {
Expand Down Expand Up @@ -926,6 +931,28 @@ public Sketch deserialize(ByteBuffer byteBuffer) {
}
};

public static final ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>> DATA_SKETCH_INT_TUPLE_SER_DE =
new ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>>() {
@Override
public byte[] serialize(org.apache.datasketches.tuple.Sketch<IntegerSummary> value) {
return value.compact().toByteArray();
}

@Override
public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(byte[] bytes) {
return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
new IntegerSummaryDeserializer());
}

@Override
public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(ByteBuffer byteBuffer) {
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
new IntegerSummaryDeserializer());
}
};

public static final ObjectSerDe<KllDoublesSketch> KLL_SKETCH_SER_DE = new ObjectSerDe<KllDoublesSketch>() {

@Override
Expand Down Expand Up @@ -1298,6 +1325,7 @@ public ArgMinMaxObject deserialize(ByteBuffer byteBuffer) {
PINOT_FOURTH_MOMENT_OBJECT_SER_DE,
ARG_MIN_MAX_OBJECT_SER_DE,
KLL_SKETCH_SER_DE,
DATA_SKETCH_INT_TUPLE_SER_DE,
};
//@formatter:on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import javax.annotation.Nullable;
import org.apache.datasketches.theta.Sketches;
import org.apache.datasketches.theta.UpdateSketch;
import org.apache.datasketches.tuple.aninteger.IntegerSketch;
import org.apache.datasketches.tuple.aninteger.IntegerSummary;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.spi.annotations.ScalarFunction;
import org.apache.pinot.spi.utils.CommonConstants;
Expand Down Expand Up @@ -87,20 +89,24 @@ public static byte[] toThetaSketch(@Nullable Object input) {
@ScalarFunction(nullableParameters = true)
public static byte[] toThetaSketch(@Nullable Object input, int nominalEntries) {
UpdateSketch sketch = Sketches.updateSketchBuilder().setNominalEntries(nominalEntries).build();
if (input instanceof Integer) {
sketch.update((Integer) input);
} else if (input instanceof Long) {
sketch.update((Long) input);
} else if (input instanceof Float) {
sketch.update((Float) input);
} else if (input instanceof Double) {
sketch.update((Double) input);
} else if (input instanceof BigDecimal) {
sketch.update(((BigDecimal) input).toString());
} else if (input instanceof String) {
sketch.update((String) input);
} else if (input instanceof byte[]) {
sketch.update((byte[]) input);
if (input != null) {
if (input instanceof Integer) {
sketch.update((Integer) input);
} else if (input instanceof Long) {
sketch.update((Long) input);
} else if (input instanceof Float) {
sketch.update((Float) input);
} else if (input instanceof Double) {
sketch.update((Double) input);
} else if (input instanceof BigDecimal) {
sketch.update(((BigDecimal) input).toString());
} else if (input instanceof String) {
sketch.update((String) input);
} else if (input instanceof byte[]) {
sketch.update((byte[]) input);
} else {
throw new IllegalArgumentException("Unrecognised input type for Theta sketch: " + input.getClass().getName());
}
}
return ObjectSerDeUtils.DATA_SKETCH_SER_DE.serialize(sketch.compact());
}
Expand Down Expand Up @@ -131,4 +137,51 @@ public static byte[] toHLL(@Nullable Object input, int log2m) {
}
return ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.serialize(hll);
}

/**
* Create a Tuple Sketch containing the key and value supplied
*
* @param key an Object we want to insert as the key of the sketch, may be null to return an empty sketch
* @param value an Integer we want to associate as the value to go along with the key, may be null to return an
* empty sketch
* @return serialized tuple sketch as bytes
*/
@ScalarFunction(nullableParameters = true)
public static byte[] toIntegerSumTupleSketch(@Nullable Object key, @Nullable Integer value) {
return toIntegerSumTupleSketch(key, value, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
}

/**
* Create a Tuple Sketch containing the key and value supplied
*
* @param key an Object we want to insert as the key of the sketch, may be null to return an empty sketch
* @param value an Integer we want to associate as the value to go along with the key, may be null to return an
* empty sketch
* @param lgK integer representing the log of the maximum number of retained entries in the sketch, between 4 and 26
* @return serialized tuple sketch as bytes
*/
@ScalarFunction(nullableParameters = true)
public static byte[] toIntegerSumTupleSketch(@Nullable Object key, Integer value, int lgK) {
IntegerSketch is = new IntegerSketch(lgK, IntegerSummary.Mode.Sum);
if (value != null && key != null) {
if (key instanceof Integer) {
is.update((Integer) key, value);
} else if (key instanceof Long) {
is.update((Long) key, value);
} else if (key instanceof Float) {
is.update((float) key, value);
} else if (key instanceof Double) {
is.update((double) key, value);
} else if (key instanceof BigDecimal) {
is.update(((BigDecimal) key).toString(), value);
} else if (key instanceof String) {
is.update((String) key, value);
} else if (key instanceof byte[]) {
is.update((byte[]) key, value);
} else {
throw new IllegalArgumentException("Unrecognised key type for Theta sketch: " + key.getClass().getName());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case you want to validate/catch invalid types, consider throwing an IllegalStateException/illegalArg exception ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, added it for theta too and expanded the tests to cover

}
return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(is.compact());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.base.Preconditions;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.datasketches.tuple.aninteger.IntegerSummary;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FunctionContext;
import org.apache.pinot.core.query.request.context.QueryContext;
Expand Down Expand Up @@ -336,6 +337,15 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.KURTOSIS);
case FOURTHMOMENT:
return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.MOMENT);
case DISTINCTCOUNTTUPLESKETCH:
// mode actually doesn't matter here because we only care about keys, not values
return new DistinctCountIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason why we pass IntegerSummary.Mode.Sum as a parameter ? We are already differentiating based on the aggregation implementations IntegerTupleSketchAggregationFunction vs AvgIntegerTupleSketchAggregationFunction vs SumValuesIntegerTupleSketchAggregationFunction

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is the mode for IntegerSummary merging, all of these use Sum

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so there can be functions that can use other summary modes (min, max..) in the future.

case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
return new IntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
case SUMVALUESINTEGERSUMTUPLESKETCH:
return new SumValuesIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
case AVGVALUEINTEGERSUMTUPLESKETCH:
return new AvgValueIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
case PARENTARGMAX:
return new ParentArgMinMaxAggregationFunction(arguments, true);
case PARENTARGMIN:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.query.aggregation.function;

import java.util.List;
import org.apache.datasketches.tuple.CompactSketch;
import org.apache.datasketches.tuple.SketchIterator;
import org.apache.datasketches.tuple.Union;
import org.apache.datasketches.tuple.aninteger.IntegerSummary;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.segment.spi.AggregationFunctionType;


public class AvgValueIntegerTupleSketchAggregationFunction
extends IntegerTupleSketchAggregationFunction {

public AvgValueIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
super(arguments, mode);
}

// TODO if extra aggregation modes are supported, make this switch
// ie, if a Mode argument other than SUM is passed in, switch to the matching AggregationFunctionType
@Override
public AggregationFunctionType getType() {
return AggregationFunctionType.AVGVALUEINTEGERSUMTUPLESKETCH;
}

@Override
public ColumnDataType getFinalResultColumnType() {
return ColumnDataType.LONG;
}

@Override
public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
if (integerSummarySketches == null) {
return null;
}
Union<IntegerSummary> union = new Union<>(_entries, _setOps);
integerSummarySketches.forEach(union::union);
double retainedTotal = 0L;
CompactSketch<IntegerSummary> result = union.getResult();
SketchIterator<IntegerSummary> summaries = result.iterator();
while (summaries.next()) {
retainedTotal += summaries.getSummary().getValue();
}
if (result.getRetainedEntries() == 0) {
// there is nothing to average, return null
return null;
}
double estimate = retainedTotal / result.getRetainedEntries();
return Math.round(estimate);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.query.aggregation.function;

import java.util.List;
import org.apache.datasketches.tuple.CompactSketch;
import org.apache.datasketches.tuple.Union;
import org.apache.datasketches.tuple.aninteger.IntegerSummary;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.segment.spi.AggregationFunctionType;


public class DistinctCountIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction {

public DistinctCountIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments,
IntegerSummary.Mode mode) {
super(arguments, mode);
}

// TODO if extra aggregation modes are supported, make this switch
@Override
public AggregationFunctionType getType() {
return AggregationFunctionType.DISTINCTCOUNTTUPLESKETCH;
}

@Override
public ColumnDataType getFinalResultColumnType() {
return ColumnDataType.LONG;
}

@Override
public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
Union<IntegerSummary> union = new Union<>(_entries, _setOps);
integerSummarySketches.forEach(union::union);
return Double.valueOf(union.getResult().getEstimate()).longValue();
}
}
Loading