-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Integer Tuple Sketch support #10427
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Integer Tuple Sketch support #10427
Changes from all commits
e2464e7
af8a3b9
0eb23bb
a081c10
9a3771c
bb8054f
1821a2c
4dcffb3
b038bbc
3b41c5b
bb69257
0c26150
c99bb69
1207e88
d246b81
0efea6e
da958b8
798181a
3e13196
b63d47e
c021c64
2381e4d
b44b0d9
1b7fe74
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,8 @@ | |
| import javax.annotation.Nullable; | ||
| import org.apache.datasketches.theta.Sketches; | ||
| import org.apache.datasketches.theta.UpdateSketch; | ||
| import org.apache.datasketches.tuple.aninteger.IntegerSketch; | ||
| import org.apache.datasketches.tuple.aninteger.IntegerSummary; | ||
| import org.apache.pinot.core.common.ObjectSerDeUtils; | ||
| import org.apache.pinot.spi.annotations.ScalarFunction; | ||
| import org.apache.pinot.spi.utils.CommonConstants; | ||
|
|
@@ -87,20 +89,24 @@ public static byte[] toThetaSketch(@Nullable Object input) { | |
| @ScalarFunction(nullableParameters = true) | ||
| public static byte[] toThetaSketch(@Nullable Object input, int nominalEntries) { | ||
| UpdateSketch sketch = Sketches.updateSketchBuilder().setNominalEntries(nominalEntries).build(); | ||
| if (input instanceof Integer) { | ||
| sketch.update((Integer) input); | ||
| } else if (input instanceof Long) { | ||
| sketch.update((Long) input); | ||
| } else if (input instanceof Float) { | ||
| sketch.update((Float) input); | ||
| } else if (input instanceof Double) { | ||
| sketch.update((Double) input); | ||
| } else if (input instanceof BigDecimal) { | ||
| sketch.update(((BigDecimal) input).toString()); | ||
| } else if (input instanceof String) { | ||
| sketch.update((String) input); | ||
| } else if (input instanceof byte[]) { | ||
| sketch.update((byte[]) input); | ||
| if (input != null) { | ||
| if (input instanceof Integer) { | ||
| sketch.update((Integer) input); | ||
| } else if (input instanceof Long) { | ||
| sketch.update((Long) input); | ||
| } else if (input instanceof Float) { | ||
| sketch.update((Float) input); | ||
| } else if (input instanceof Double) { | ||
| sketch.update((Double) input); | ||
| } else if (input instanceof BigDecimal) { | ||
| sketch.update(((BigDecimal) input).toString()); | ||
| } else if (input instanceof String) { | ||
| sketch.update((String) input); | ||
| } else if (input instanceof byte[]) { | ||
| sketch.update((byte[]) input); | ||
| } else { | ||
| throw new IllegalArgumentException("Unrecognised input type for Theta sketch: " + input.getClass().getName()); | ||
| } | ||
| } | ||
| return ObjectSerDeUtils.DATA_SKETCH_SER_DE.serialize(sketch.compact()); | ||
| } | ||
|
|
@@ -131,4 +137,51 @@ public static byte[] toHLL(@Nullable Object input, int log2m) { | |
| } | ||
| return ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.serialize(hll); | ||
| } | ||
|
|
||
| /** | ||
| * Create a Tuple Sketch containing the key and value supplied | ||
| * | ||
| * @param key an Object we want to insert as the key of the sketch, may be null to return an empty sketch | ||
| * @param value an Integer we want to associate as the value to go along with the key, may be null to return an | ||
| * empty sketch | ||
| * @return serialized tuple sketch as bytes | ||
| */ | ||
| @ScalarFunction(nullableParameters = true) | ||
| public static byte[] toIntegerSumTupleSketch(@Nullable Object key, @Nullable Integer value) { | ||
| return toIntegerSumTupleSketch(key, value, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK); | ||
| } | ||
|
|
||
| /** | ||
| * Create a Tuple Sketch containing the key and value supplied | ||
| * | ||
| * @param key an Object we want to insert as the key of the sketch, may be null to return an empty sketch | ||
| * @param value an Integer we want to associate as the value to go along with the key, may be null to return an | ||
| * empty sketch | ||
| * @param lgK integer representing the log of the maximum number of retained entries in the sketch, between 4 and 26 | ||
| * @return serialized tuple sketch as bytes | ||
| */ | ||
| @ScalarFunction(nullableParameters = true) | ||
| public static byte[] toIntegerSumTupleSketch(@Nullable Object key, Integer value, int lgK) { | ||
| IntegerSketch is = new IntegerSketch(lgK, IntegerSummary.Mode.Sum); | ||
| if (value != null && key != null) { | ||
| if (key instanceof Integer) { | ||
andimiller marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| is.update((Integer) key, value); | ||
| } else if (key instanceof Long) { | ||
| is.update((Long) key, value); | ||
| } else if (key instanceof Float) { | ||
| is.update((float) key, value); | ||
| } else if (key instanceof Double) { | ||
| is.update((double) key, value); | ||
| } else if (key instanceof BigDecimal) { | ||
| is.update(((BigDecimal) key).toString(), value); | ||
| } else if (key instanceof String) { | ||
| is.update((String) key, value); | ||
| } else if (key instanceof byte[]) { | ||
| is.update((byte[]) key, value); | ||
| } else { | ||
| throw new IllegalArgumentException("Unrecognised key type for Theta sketch: " + key.getClass().getName()); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In case you want to validate/catch invalid types, consider throwing an IllegalStateException/illegalArg exception ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done, added it for theta too and expanded the tests to cover |
||
| } | ||
| return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(is.compact()); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |
| import com.google.common.base.Preconditions; | ||
| import java.util.List; | ||
| import org.apache.commons.lang3.StringUtils; | ||
| import org.apache.datasketches.tuple.aninteger.IntegerSummary; | ||
| import org.apache.pinot.common.request.context.ExpressionContext; | ||
| import org.apache.pinot.common.request.context.FunctionContext; | ||
| import org.apache.pinot.core.query.request.context.QueryContext; | ||
|
|
@@ -336,6 +337,15 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio | |
| return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.KURTOSIS); | ||
| case FOURTHMOMENT: | ||
| return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.MOMENT); | ||
| case DISTINCTCOUNTTUPLESKETCH: | ||
| // mode actually doesn't matter here because we only care about keys, not values | ||
| return new DistinctCountIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there a reason why we pass IntegerSummary.Mode.Sum as a parameter ? We are already differentiating based on the aggregation implementations IntegerTupleSketchAggregationFunction vs AvgIntegerTupleSketchAggregationFunction vs SumValuesIntegerTupleSketchAggregationFunction
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that is the mode for
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, so there can be functions that can use other summary modes (min, max..) in the future. |
||
| case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH: | ||
| return new IntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum); | ||
| case SUMVALUESINTEGERSUMTUPLESKETCH: | ||
| return new SumValuesIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum); | ||
| case AVGVALUEINTEGERSUMTUPLESKETCH: | ||
| return new AvgValueIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum); | ||
| case PARENTARGMAX: | ||
| return new ParentArgMinMaxAggregationFunction(arguments, true); | ||
| case PARENTARGMIN: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.pinot.core.query.aggregation.function; | ||
|
|
||
| import java.util.List; | ||
| import org.apache.datasketches.tuple.CompactSketch; | ||
| import org.apache.datasketches.tuple.SketchIterator; | ||
| import org.apache.datasketches.tuple.Union; | ||
| import org.apache.datasketches.tuple.aninteger.IntegerSummary; | ||
| import org.apache.pinot.common.request.context.ExpressionContext; | ||
| import org.apache.pinot.common.utils.DataSchema.ColumnDataType; | ||
| import org.apache.pinot.segment.spi.AggregationFunctionType; | ||
|
|
||
|
|
||
| public class AvgValueIntegerTupleSketchAggregationFunction | ||
| extends IntegerTupleSketchAggregationFunction { | ||
|
|
||
| public AvgValueIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) { | ||
| super(arguments, mode); | ||
| } | ||
|
|
||
| // TODO if extra aggregation modes are supported, make this switch | ||
andimiller marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // ie, if a Mode argument other than SUM is passed in, switch to the matching AggregationFunctionType | ||
| @Override | ||
| public AggregationFunctionType getType() { | ||
| return AggregationFunctionType.AVGVALUEINTEGERSUMTUPLESKETCH; | ||
| } | ||
|
|
||
| @Override | ||
| public ColumnDataType getFinalResultColumnType() { | ||
| return ColumnDataType.LONG; | ||
| } | ||
|
|
||
| @Override | ||
| public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) { | ||
| if (integerSummarySketches == null) { | ||
| return null; | ||
| } | ||
| Union<IntegerSummary> union = new Union<>(_entries, _setOps); | ||
| integerSummarySketches.forEach(union::union); | ||
| double retainedTotal = 0L; | ||
| CompactSketch<IntegerSummary> result = union.getResult(); | ||
| SketchIterator<IntegerSummary> summaries = result.iterator(); | ||
| while (summaries.next()) { | ||
| retainedTotal += summaries.getSummary().getValue(); | ||
| } | ||
| if (result.getRetainedEntries() == 0) { | ||
| // there is nothing to average, return null | ||
| return null; | ||
| } | ||
| double estimate = retainedTotal / result.getRetainedEntries(); | ||
| return Math.round(estimate); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.pinot.core.query.aggregation.function; | ||
|
|
||
| import java.util.List; | ||
| import org.apache.datasketches.tuple.CompactSketch; | ||
| import org.apache.datasketches.tuple.Union; | ||
| import org.apache.datasketches.tuple.aninteger.IntegerSummary; | ||
| import org.apache.pinot.common.request.context.ExpressionContext; | ||
| import org.apache.pinot.common.utils.DataSchema.ColumnDataType; | ||
| import org.apache.pinot.segment.spi.AggregationFunctionType; | ||
|
|
||
|
|
||
| public class DistinctCountIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction { | ||
|
|
||
| public DistinctCountIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, | ||
| IntegerSummary.Mode mode) { | ||
| super(arguments, mode); | ||
| } | ||
|
|
||
| // TODO if extra aggregation modes are supported, make this switch | ||
| @Override | ||
| public AggregationFunctionType getType() { | ||
| return AggregationFunctionType.DISTINCTCOUNTTUPLESKETCH; | ||
| } | ||
|
|
||
| @Override | ||
| public ColumnDataType getFinalResultColumnType() { | ||
| return ColumnDataType.LONG; | ||
| } | ||
|
|
||
| @Override | ||
| public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) { | ||
| Union<IntegerSummary> union = new Union<>(_entries, _setOps); | ||
| integerSummarySketches.forEach(union::union); | ||
| return Double.valueOf(union.getResult().getEstimate()).longValue(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a safe assumption? Is it also necessary to inspect the summary type to verify integer?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right now it is, but to add other types of tuple Sketch we'd need to add wrapper types, due to JVM type erasure