Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,9 @@ private static HyperLogLog getDistinctCountHLLResult(Dictionary dictionary,

private static Object getDistinctCountSmartHLLResult(Dictionary dictionary,
DistinctCountSmartHLLAggregationFunction function) {
if (dictionary.length() > function.getHllConversionThreshold()) {
if (dictionary.length() > function.getThreshold()) {
// Store values into a HLL when the dictionary size exceeds the conversion threshold
return getDistinctValueHLL(dictionary, function.getHllLog2m());
return getDistinctValueHLL(dictionary, function.getLog2m());
} else {
return getDistinctValueSet(dictionary);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,35 +54,35 @@
* exceeds a threshold, the Set will be converted into a HyperLogLog, and approximate result will be returned.
*
* The function takes an optional second argument for parameters:
* - hllLog2m: Log2m for the converted HyperLogLog, 12 by default.
* - hllConversionThreshold: Threshold of the number of distinct values to trigger the conversion, 100_000 by default.
* Non-positive value means never convert.
* Example of second argument: 'hllLog2m=8;hllConversionThreshold=10'
* - threshold: Threshold of the number of distinct values to trigger the conversion, 100_000 by default. Non-positive
* value means never convert.
* - log2m: Log2m for the converted HyperLogLog, 12 by default.
* Example of second argument: 'threshold=10;log2m=8'
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class DistinctCountSmartHLLAggregationFunction extends BaseSingleInputAggregationFunction<Object, Integer> {
private final int _hllLog2m;
private final int _hllConversionThreshold;
private final int _threshold;
private final int _log2m;

public DistinctCountSmartHLLAggregationFunction(List<ExpressionContext> arguments) {
super(arguments.get(0));

if (arguments.size() > 1) {
Parameters parameters = new Parameters(arguments.get(1).getLiteral());
_hllLog2m = parameters._hllLog2m;
_hllConversionThreshold = parameters._hllConversionThreshold;
_threshold = parameters._threshold;
_log2m = parameters._log2m;
} else {
_hllLog2m = Parameters.DEFAULT_HLL_LOG2M;
_hllConversionThreshold = Parameters.DEFAULT_HLL_CONVERSION_THRESHOLD;
_threshold = Parameters.DEFAULT_THRESHOLD;
_log2m = Parameters.DEFAULT_LOG2M;
}
}

public int getHllLog2m() {
return _hllLog2m;
public int getThreshold() {
return _threshold;
}

public int getHllConversionThreshold() {
return _hllConversionThreshold;
public int getLog2m() {
return _log2m;
}

@Override
Expand Down Expand Up @@ -329,7 +329,7 @@ private void aggregateIntoSet(int length, AggregationResultHolder aggregationRes
}

// Convert to HLL if the set size exceeds the threshold
if (valueSet.size() > _hllConversionThreshold) {
if (valueSet.size() > _threshold) {
aggregationResultHolder.setValue(convertSetToHLL(valueSet, storedType));
}
}
Expand All @@ -343,15 +343,15 @@ protected HyperLogLog convertSetToHLL(Set valueSet, DataType storedType) {
}

protected HyperLogLog convertByteArraySetToHLL(ObjectSet<ByteArray> valueSet) {
HyperLogLog hll = new HyperLogLog(_hllLog2m);
HyperLogLog hll = new HyperLogLog(_log2m);
for (ByteArray value : valueSet) {
hll.offer(value.getBytes());
}
return hll;
}

protected HyperLogLog convertNonByteArraySetToHLL(Set valueSet) {
HyperLogLog hll = new HyperLogLog(_hllLog2m);
HyperLogLog hll = new HyperLogLog(_log2m);
for (Object value : valueSet) {
hll.offer(value);
}
Expand Down Expand Up @@ -632,7 +632,7 @@ public Object extractAggregationResult(AggregationResultHolder aggregationResult
if (result instanceof DictIdsWrapper) {
// For dictionary-encoded expression, convert dictionary ids to values
DictIdsWrapper dictIdsWrapper = (DictIdsWrapper) result;
if (dictIdsWrapper._dictIdBitmap.cardinalityExceeds(_hllConversionThreshold)) {
if (dictIdsWrapper._dictIdBitmap.cardinalityExceeds(_threshold)) {
return convertToHLL(dictIdsWrapper);
} else {
return convertToValueSet(dictIdsWrapper);
Expand Down Expand Up @@ -680,7 +680,7 @@ public Object merge(Object intermediateResult1, Object intermediateResult2) {
valueSet1.addAll(valueSet2);

// Convert to HLL if the set size exceeds the threshold
if (valueSet1.size() > _hllConversionThreshold) {
if (valueSet1.size() > _threshold) {
if (valueSet1 instanceof ObjectSet && valueSet1.iterator().next() instanceof ByteArray) {
return convertByteArraySetToHLL((ObjectSet<ByteArray>) valueSet1);
} else {
Expand All @@ -691,7 +691,7 @@ public Object merge(Object intermediateResult1, Object intermediateResult2) {
}
}

private HyperLogLog mergeIntoHLL(HyperLogLog hll, Object intermediateResult) {
private static HyperLogLog mergeIntoHLL(HyperLogLog hll, Object intermediateResult) {
if (intermediateResult instanceof HyperLogLog) {
try {
hll.addAll((HyperLogLog) intermediateResult);
Expand Down Expand Up @@ -924,7 +924,7 @@ private static Set convertToValueSet(DictIdsWrapper dictIdsWrapper) {
* Helper method to read dictionary and convert dictionary ids to a HyperLogLog for dictionary-encoded expression.
*/
private HyperLogLog convertToHLL(DictIdsWrapper dictIdsWrapper) {
HyperLogLog hyperLogLog = new HyperLogLog(_hllLog2m);
HyperLogLog hyperLogLog = new HyperLogLog(_log2m);
Dictionary dictionary = dictIdsWrapper._dictionary;
RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap;
PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
Expand All @@ -934,7 +934,7 @@ private HyperLogLog convertToHLL(DictIdsWrapper dictIdsWrapper) {
return hyperLogLog;
}

private IllegalStateException getIllegalDataTypeException(DataType dataType, boolean singleValue) {
private static IllegalStateException getIllegalDataTypeException(DataType dataType, boolean singleValue) {
return new IllegalStateException(
"Illegal data type for DISTINCT_COUNT_SMART_HLL aggregation function: " + dataType + (singleValue ? ""
: "_MV"));
Expand All @@ -956,16 +956,21 @@ private DictIdsWrapper(Dictionary dictionary) {
private static class Parameters {
static final char PARAMETER_DELIMITER = ';';
static final char PARAMETER_KEY_VALUE_SEPARATOR = '=';
static final String HLL_LOG2M_KEY = "HLLLOG2M";
static final String HLL_CONVERSION_THRESHOLD_KEY = "HLLCONVERSIONTHRESHOLD";

// Use 12 by default to get good accuracy for DistinctCount
static final int DEFAULT_HLL_LOG2M = 12;
static final String THRESHOLD_KEY = "THRESHOLD";
// 100K values to trigger HLL conversion by default
static final int DEFAULT_HLL_CONVERSION_THRESHOLD = 100_000;
static final int DEFAULT_THRESHOLD = 100_000;
@Deprecated
static final String DEPRECATED_THRESHOLD_KEY = "HLLCONVERSIONTHRESHOLD";

static final String LOG2M_KEY = "LOG2M";
// Use 12 by default to get good accuracy for DistinctCount
static final int DEFAULT_LOG2M = 12;
@Deprecated
static final String DEPRECATED_LOG2M_KEY = "HLLLOG2M";

int _hllLog2m = DEFAULT_HLL_LOG2M;
int _hllConversionThreshold = DEFAULT_HLL_CONVERSION_THRESHOLD;
int _threshold = DEFAULT_THRESHOLD;
int _log2m = DEFAULT_LOG2M;

Parameters(String parametersString) {
StringUtils.deleteWhitespace(parametersString);
Expand All @@ -976,16 +981,18 @@ private static class Parameters {
String key = keyAndValue[0];
String value = keyAndValue[1];
switch (key.toUpperCase()) {
case HLL_LOG2M_KEY:
_hllLog2m = Integer.parseInt(value);
break;
case HLL_CONVERSION_THRESHOLD_KEY:
_hllConversionThreshold = Integer.parseInt(value);
case THRESHOLD_KEY:
case DEPRECATED_THRESHOLD_KEY:
_threshold = Integer.parseInt(value);
// Treat non-positive threshold as unlimited
if (_hllConversionThreshold <= 0) {
_hllConversionThreshold = Integer.MAX_VALUE;
if (_threshold <= 0) {
_threshold = Integer.MAX_VALUE;
}
break;
case LOG2M_KEY:
case DEPRECATED_LOG2M_KEY:
_log2m = Integer.parseInt(value);
break;
default:
throw new IllegalArgumentException("Invalid parameter key: " + key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@
import org.apache.pinot.core.operator.query.AggregationGroupByOperator;
import org.apache.pinot.core.operator.query.AggregationOperator;
import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
import org.apache.pinot.core.query.aggregation.function.DistinctCountSmartHLLAggregationFunction;
import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
Expand Down Expand Up @@ -391,12 +394,11 @@ public void testHLL() {
@Test
public void testSmartHLL() {
// Dictionary based
String query = "SELECT DISTINCTCOUNTSMARTHLL(intColumn, 'hllConversionThreshold=10'), "
+ "DISTINCTCOUNTSMARTHLL(longColumn, 'hllConversionThreshold=10'), "
+ "DISTINCTCOUNTSMARTHLL(floatColumn, 'hllConversionThreshold=10'), "
+ "DISTINCTCOUNTSMARTHLL(doubleColumn, 'hllConversionThreshold=10'), "
+ "DISTINCTCOUNTSMARTHLL(stringColumn, 'hllConversionThreshold=10'), "
+ "DISTINCTCOUNTSMARTHLL(bytesColumn, 'hllConversionThreshold=10') FROM testTable";
String query = "SELECT DISTINCTCOUNTSMARTHLL(intColumn, 'threshold=10'), "
+ "DISTINCTCOUNTSMARTHLL(longColumn, 'threshold=10'), " + "DISTINCTCOUNTSMARTHLL(floatColumn, 'threshold=10'), "
+ "DISTINCTCOUNTSMARTHLL(doubleColumn, 'threshold=10'), "
+ "DISTINCTCOUNTSMARTHLL(stringColumn, 'threshold=10'), "
+ "DISTINCTCOUNTSMARTHLL(bytesColumn, 'threshold=10') FROM testTable";

// Inner segment
String[] interSegmentsExpectedResults = new String[6];
Expand Down Expand Up @@ -469,7 +471,7 @@ public void testSmartHLL() {
}

// Change log2m
query = "SELECT DISTINCTCOUNTSMARTHLL(intColumn, 'hllLog2m=8;hllConversionThreshold=10') FROM testTable";
query = "SELECT DISTINCTCOUNTSMARTHLL(intColumn, 'threshold=10;log2m=8') FROM testTable";
operator = getOperatorForSqlQuery(query);
assertTrue(operator instanceof NonScanBasedAggregationOperator);
aggregationResult = ((NonScanBasedAggregationOperator) operator).nextBlock().getAggregationResult();
Expand All @@ -479,6 +481,15 @@ public void testSmartHLL() {
HyperLogLog hll = (HyperLogLog) aggregationResult.get(0);
// Check log2m is 8
assertEquals(hll.sizeof(), 172);

// Test legacy parameters
query = "SELECT DISTINCTCOUNTSMARTHLL(intColumn, 'hllConversionThreshold=10;hllLog2m=8') FROM testTable";
QueryContext queryContext = QueryContextConverterUtils.getQueryContextFromSQL(query);
assertNotNull(queryContext.getAggregationFunctions());
DistinctCountSmartHLLAggregationFunction function =
(DistinctCountSmartHLLAggregationFunction) queryContext.getAggregationFunctions()[0];
assertEquals(function.getThreshold(), 10);
assertEquals(function.getLog2m(), 8);
}

@AfterClass
Expand Down