Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for configuring Theta and Tuple aggregation functions #14167

Merged
Next Next commit
Add support for configuring Theta and Tuple aggregation function para…
…ms in the ST index

This patch introduces a mechanism to allow configuring the aggregation function parameters
for a star-tree index for Tuple and Theta sketches.  Any existing aggregation that has a
precision greater or equal to that of the query precision is selected as a candidate.
The behaviour of the CPC aggregator has been changed accordingly.
  • Loading branch information
davecromberge committed Oct 4, 2024
commit 736c493c6430f7c25fd1bfdf1bafe7a6a3481eee
Original file line number Diff line number Diff line change
Expand Up @@ -422,16 +422,18 @@ public Comparable mergeFinalResult(Comparable finalResult1, Comparable finalResu

@Override
public boolean canUseStarTree(Map<String, Object> functionParameters) {
Object lgK = functionParameters.get(Constants.CPCSKETCH_LGK_KEY);
Object lgKParam = functionParameters.get(Constants.CPCSKETCH_LGK_KEY);
int starTreeLgK;

// Check if lgK values match
if (lgK != null) {
return _lgNominalEntries == Integer.parseInt(String.valueOf(lgK));
if (lgKParam != null) {
starTreeLgK = Integer.parseInt(String.valueOf(lgKParam));
} else {
// If the functionParameters don't have an explicit lgK set, it means that the star-tree index was built with
// the default value for lgK
return _lgNominalEntries == CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK;
starTreeLgK = CommonConstants.Helix.DEFAULT_CPC_SKETCH_LGK;
}
// Check if the query lgK param is less than or equal to that of the StarTree aggregation
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a one-liner explanation on why we're doing a <= check (#13835 (comment)) might be useful for future readers here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

return _lgNominalEntries <= starTreeLgK;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@
import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.Constants;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.sql.parsers.CalciteSqlParser;


Expand Down Expand Up @@ -92,6 +94,7 @@ public class DistinctCountThetaSketchAggregationFunction
private final List<FilterEvaluator> _filterEvaluators;
private final ExpressionContext _postAggregationExpression;
private final UpdateSketchBuilder _updateSketchBuilder = new UpdateSketchBuilder();
private int _nominalEntries = ThetaUtil.DEFAULT_NOMINAL_ENTRIES;
protected final SetOperationBuilder _setOperationBuilder = new SetOperationBuilder();
protected int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD;

Expand All @@ -108,9 +111,9 @@ public DistinctCountThetaSketchAggregationFunction(List<ExpressionContext> argum
// Allows the user to trade-off memory usage for merge CPU; higher values use more memory
_accumulatorThreshold = parameters.getAccumulatorThreshold();
// Nominal entries controls sketch accuracy and size
int nominalEntries = parameters.getNominalEntries();
_updateSketchBuilder.setNominalEntries(nominalEntries);
_setOperationBuilder.setNominalEntries(nominalEntries);
_nominalEntries = parameters.getNominalEntries();
_updateSketchBuilder.setNominalEntries(_nominalEntries);
_setOperationBuilder.setNominalEntries(_nominalEntries);
// Sampling probability sets the initial value of Theta, defaults to 1.0
float p = parameters.getSamplingProbability();
_setOperationBuilder.setP(p);
Expand Down Expand Up @@ -1035,6 +1038,24 @@ public Comparable mergeFinalResult(Comparable finalResult1, Comparable finalResu
return (Long) finalResult1 + (Long) finalResult2;
}

@Override
public boolean canUseStarTree(Map<String, Object> functionParameters) {
Object nominalEntriesParam = functionParameters.get(Constants.THETASKETCH_NOMINAL_ENTRIES);
int starTreeNominalEntries;

// Check if nominal entries values match
if (nominalEntriesParam != null) {
starTreeNominalEntries = Integer.parseInt(String.valueOf(nominalEntriesParam));
} else {
// If the functionParameters don't have an explicit nominal entries value set, it means that the star-tree
// index was built with
// the default value for nominal entries
starTreeNominalEntries = CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES;
}
// Check if the query lgK param is less than or equal to that of the StarTree aggregation
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Check if the query lgK param is less than or equal to that of the StarTree aggregation
// Check if the query nominalEntries param is less than or equal to that of the StarTree aggregation

nit: looks like this aggregation function directly takes the K value rather than lgK?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I suppose it doesn't matter either way since the comparisons are equivalent.

return _nominalEntries <= starTreeNominalEntries;
}

// This ensures backward compatibility with servers that still return sketches directly.
// The AggregationDataTableReducer casts intermediate results to Objects and although the code compiles,
// types might still be incompatible at runtime due to type erasure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
import org.apache.pinot.segment.local.customobject.TupleIntSketchAccumulator;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.Constants;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.utils.CommonConstants;

Expand Down Expand Up @@ -275,6 +276,24 @@ public Comparable extractFinalResult(TupleIntSketchAccumulator accumulator) {
return Base64.getEncoder().encodeToString(accumulator.getResult().toByteArray());
}

@Override
public boolean canUseStarTree(Map<String, Object> functionParameters) {
Object nominalEntriesParam = functionParameters.get(Constants.TUPLESKETCH_NOMINAL_ENTRIES);
int starTreeNominalEntries;

// Check if nominal entries values match
if (nominalEntriesParam != null) {
starTreeNominalEntries = Integer.parseInt(String.valueOf(nominalEntriesParam));
} else {
// If the functionParameters don't have an explicit nominal entries value set, it means that the star-tree
// index was built with
// the default value for nominal entries
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// index was built with
// the default value for nominal entries
// index was built with the default value for nominal entries

nit: formatting

starTreeNominalEntries = (int) Math.pow(2, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
}
// Check if the query lgK param is less than or equal to that of the StarTree aggregation
return _nominalEntries <= starTreeNominalEntries;
}

/**
* Returns the accumulator from the result holder or creates a new one if it does not exist.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,21 @@ public class DistinctCountCPCSketchAggregationFunctionTest {

@Test
public void testCanUseStarTreeDefaultLgK() {
DistinctCountCPCSketchAggregationFunction function = new DistinctCountCPCSketchAggregationFunction(
List.of(ExpressionContext.forIdentifier("col")));
DistinctCountCPCSketchAggregationFunction function =
new DistinctCountCPCSketchAggregationFunction(List.of(ExpressionContext.forIdentifier("col")));

Assert.assertTrue(function.canUseStarTree(Map.of()));
Assert.assertTrue(function.canUseStarTree(Map.of(Constants.CPCSKETCH_LGK_KEY, "12")));
Assert.assertTrue(function.canUseStarTree(Map.of(Constants.CPCSKETCH_LGK_KEY, 12)));
Assert.assertFalse(function.canUseStarTree(Map.of(Constants.CPCSKETCH_LGK_KEY, 16)));
Assert.assertFalse(function.canUseStarTree(Map.of(Constants.CPCSKETCH_LGK_KEY, 11)));

function = new DistinctCountCPCSketchAggregationFunction(List.of(ExpressionContext.forIdentifier("col"),
ExpressionContext.forLiteral(Literal.intValue(12))));
function = new DistinctCountCPCSketchAggregationFunction(
List.of(ExpressionContext.forIdentifier("col"), ExpressionContext.forLiteral(Literal.intValue(12))));

Assert.assertTrue(function.canUseStarTree(Map.of()));
Assert.assertTrue(function.canUseStarTree(Map.of(Constants.CPCSKETCH_LGK_KEY, "12")));
Assert.assertTrue(function.canUseStarTree(Map.of(Constants.CPCSKETCH_LGK_KEY, 12)));
Assert.assertFalse(function.canUseStarTree(Map.of(Constants.CPCSKETCH_LGK_KEY, 16)));
Assert.assertFalse(function.canUseStarTree(Map.of(Constants.CPCSKETCH_LGK_KEY, 11)));
}

@Test
Expand All @@ -54,8 +54,9 @@ public void testCanUseCustomLgK() {
List.of(ExpressionContext.forIdentifier("col"),
ExpressionContext.forLiteral(Literal.stringValue("nominalEntries=8192"))));

// Default lgK = 12 / K=4096
Assert.assertFalse(function.canUseStarTree(Map.of()));
Assert.assertFalse(function.canUseStarTree(Map.of(Constants.CPCSKETCH_LGK_KEY, "12")));
Assert.assertTrue(function.canUseStarTree(Map.of(Constants.CPCSKETCH_LGK_KEY, "14")));
Assert.assertTrue(function.canUseStarTree(Map.of(Constants.CPCSKETCH_LGK_KEY, 13)));
Assert.assertTrue(function.canUseStarTree(Map.of(Constants.CPCSKETCH_LGK_KEY, "13")));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.query.aggregation.function;

import java.util.List;
import java.util.Map;
import org.apache.pinot.common.request.Literal;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.segment.spi.Constants;
import org.testng.Assert;
import org.testng.annotations.Test;


public class DistinctCountThetaSketchAggregationFunctionTest {

@Test
public void testCanUseStarTreeDefaultK() {
// Default aggregation function lgK = 12 / K=4096
DistinctCountThetaSketchAggregationFunction function =
new DistinctCountThetaSketchAggregationFunction(List.of(ExpressionContext.forIdentifier("col")));

Assert.assertTrue(function.canUseStarTree(Map.of()));
Assert.assertTrue(function.canUseStarTree(Map.of(Constants.THETASKETCH_NOMINAL_ENTRIES, "4096")));
Assert.assertTrue(function.canUseStarTree(Map.of(Constants.THETASKETCH_NOMINAL_ENTRIES, 4096)));
Assert.assertFalse(function.canUseStarTree(Map.of(Constants.THETASKETCH_NOMINAL_ENTRIES, 2048)));

function = new DistinctCountThetaSketchAggregationFunction(List.of(ExpressionContext.forIdentifier("col"),
ExpressionContext.forLiteral(Literal.stringValue("nominalEntries=16384"))));

Assert.assertTrue(function.canUseStarTree(Map.of()));
Assert.assertTrue(function.canUseStarTree(Map.of(Constants.THETASKETCH_NOMINAL_ENTRIES, "16384")));
Assert.assertTrue(function.canUseStarTree(Map.of(Constants.THETASKETCH_NOMINAL_ENTRIES, 16384)));
Assert.assertFalse(function.canUseStarTree(Map.of(Constants.THETASKETCH_NOMINAL_ENTRIES, 8192)));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be in the CustomK test rather than the DefaultK test?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or else nominalEntries=4096 if this was intended to test the case where the default value is explicitly set.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct and I have removed these test assertions.

}

@Test
public void testCanUseCustomK() {
DistinctCountThetaSketchAggregationFunction function = new DistinctCountThetaSketchAggregationFunction(
List.of(ExpressionContext.forIdentifier("col"),
ExpressionContext.forLiteral(Literal.stringValue("nominalEntries=32768"))));

// Default StarTree lgK = 14 / K=16384
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious - do you know why we chose lgK = 14 as the default value for the star-tree index but lgK = 12 for the query time aggregation function?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is not the case for the tuple sketch where the same default value is used across the star-tree index value aggregator and the query aggregation function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good observation. The ThetaSketch query time aggregation function was the earliest implementation of a Datasketches sketch and was used at Linkedin according to this presentation. This implementation provided a default of lgK=12 which has been preserved.

The StarTree index value aggregator for ThetaSketch was introduced later in this PR. The default chosen for the StarTree was of higher precision to allow the user greater accuracy should this be desired by overriding the default query parameter value.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks for the history!

Assert.assertFalse(function.canUseStarTree(Map.of()));
Assert.assertFalse(function.canUseStarTree(Map.of(Constants.THETASKETCH_NOMINAL_ENTRIES, "16384")));
Assert.assertTrue(function.canUseStarTree(Map.of(Constants.THETASKETCH_NOMINAL_ENTRIES, "65536")));
Assert.assertTrue(function.canUseStarTree(Map.of(Constants.THETASKETCH_NOMINAL_ENTRIES, 32768)));
Assert.assertTrue(function.canUseStarTree(Map.of(Constants.THETASKETCH_NOMINAL_ENTRIES, "32768")));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.query.aggregation.function;

import java.util.List;
import java.util.Map;
import org.apache.datasketches.tuple.aninteger.IntegerSummary;
import org.apache.pinot.common.request.Literal;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.segment.spi.Constants;
import org.testng.Assert;
import org.testng.annotations.Test;


public class IntegerTupleSketchAggregationFunctionTest {

@Test
public void testCanUseStarTreeDefaultK() {
IntegerTupleSketchAggregationFunction function =
new IntegerTupleSketchAggregationFunction(List.of(ExpressionContext.forIdentifier("col")),
IntegerSummary.Mode.Sum);

Assert.assertTrue(function.canUseStarTree(Map.of()));
Assert.assertTrue(function.canUseStarTree(Map.of(Constants.TUPLESKETCH_NOMINAL_ENTRIES, "16384")));
Assert.assertTrue(function.canUseStarTree(Map.of(Constants.TUPLESKETCH_NOMINAL_ENTRIES, 16384)));
Assert.assertFalse(function.canUseStarTree(Map.of(Constants.TUPLESKETCH_NOMINAL_ENTRIES, 8192)));

function = new IntegerTupleSketchAggregationFunction(
List.of(ExpressionContext.forIdentifier("col"), ExpressionContext.forLiteral(Literal.intValue(16384))),
IntegerSummary.Mode.Sum);

Assert.assertTrue(function.canUseStarTree(Map.of()));
Assert.assertTrue(function.canUseStarTree(Map.of(Constants.TUPLESKETCH_NOMINAL_ENTRIES, "16384")));
Assert.assertTrue(function.canUseStarTree(Map.of(Constants.TUPLESKETCH_NOMINAL_ENTRIES, 16384)));
Assert.assertFalse(function.canUseStarTree(Map.of(Constants.TUPLESKETCH_NOMINAL_ENTRIES, 8192)));
}

@Test
public void testCanUseCustomK() {
IntegerTupleSketchAggregationFunction function = new IntegerTupleSketchAggregationFunction(
List.of(ExpressionContext.forIdentifier("col"), ExpressionContext.forLiteral(Literal.intValue(32768))),
IntegerSummary.Mode.Sum);

// Default StarTree lgK = 14 / K=16384
Assert.assertFalse(function.canUseStarTree(Map.of()));
Assert.assertFalse(function.canUseStarTree(Map.of(Constants.TUPLESKETCH_NOMINAL_ENTRIES, "16384")));
Assert.assertTrue(function.canUseStarTree(Map.of(Constants.TUPLESKETCH_NOMINAL_ENTRIES, "65536")));
Assert.assertTrue(function.canUseStarTree(Map.of(Constants.TUPLESKETCH_NOMINAL_ENTRIES, 32768)));
Assert.assertTrue(function.canUseStarTree(Map.of(Constants.TUPLESKETCH_NOMINAL_ENTRIES, "32768")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.startree.v2;

import java.util.Collections;
import java.util.Random;
import org.apache.datasketches.tuple.Sketch;
import org.apache.datasketches.tuple.Union;
Expand All @@ -35,7 +36,7 @@ public class DistinctCountIntegerSumTupleSketchStarTreeV2Test extends BaseStarTr

@Override
ValueAggregator<byte[], Object> getValueAggregator() {
return new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum);
return new IntegerTupleSketchValueAggregator(Collections.emptyList(), IntegerSummary.Mode.Sum);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.startree.v2;

import java.util.Collections;
import java.util.Random;
import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.Union;
Expand All @@ -32,7 +33,7 @@ public class DistinctCountThetaSketchStarTreeV2Test extends BaseStarTreeV2Test<O

@Override
ValueAggregator<Object, Object> getValueAggregator() {
return new DistinctCountThetaSketchValueAggregator();
return new DistinctCountThetaSketchValueAggregator(Collections.emptyList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
*/
package org.apache.pinot.segment.local.aggregator;

import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import org.apache.datasketches.theta.SetOperationBuilder;
import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.Union;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
Expand All @@ -31,13 +34,19 @@ public class DistinctCountThetaSketchValueAggregator implements ValueAggregator<
public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;

private final SetOperationBuilder _setOperationBuilder;
private final int _nominalEntries;

// This changes a lot similar to the Bitmap aggregator
private int _maxByteSize;

public DistinctCountThetaSketchValueAggregator() {
_setOperationBuilder =
Union.builder().setNominalEntries(CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES);
public DistinctCountThetaSketchValueAggregator(List<ExpressionContext> arguments) {
// No argument means we use the Helix default
if (arguments.isEmpty()) {
_nominalEntries = CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES;
} else {
_nominalEntries = arguments.get(0).getLiteral().getIntValue();
}
_setOperationBuilder = Union.builder().setNominalEntries(_nominalEntries);
}

@Override
Expand All @@ -50,6 +59,11 @@ public DataType getAggregatedValueType() {
return AGGREGATED_VALUE_TYPE;
}

@VisibleForTesting
public int getNominalEntries() {
return _nominalEntries;
}

private void singleItemUpdate(Union thetaUnion, Object rawValue) {
if (rawValue instanceof String) {
thetaUnion.update((String) rawValue);
Expand Down
Loading
Loading