Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.ArrayCopyUtils;
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.CommonConstants.NullValuePlaceHolder;
Expand All @@ -43,13 +44,15 @@ public class RowBasedBlockValSet implements BlockValSet {
private final List<Object[]> _rows;
private final int _colId;
private final RoaringBitmap _nullBitmap;
private final Object _nullPlaceHolder;

public RowBasedBlockValSet(ColumnDataType columnDataType, List<Object[]> rows, int colId,
boolean nullHandlingEnabled) {
_dataType = columnDataType.toDataType();
_storedType = _dataType.getStoredType();
_rows = rows;
_colId = colId;
_nullPlaceHolder = columnDataType.getNullPlaceholder();

if (nullHandlingEnabled) {
RoaringBitmap nullBitmap;
Expand Down Expand Up @@ -400,27 +403,209 @@ public int[][] getDictionaryIdsMV() {

@Override
public int[][] getIntValuesMV() {
throw new UnsupportedOperationException();
int numRows = _rows.size();
int[][] values = new int[numRows][];
if (numRows == 0) {
return values;
}
if (_dataType == DataType.UNKNOWN) {
Arrays.fill(values, new int[0]);
return values;
}

for (int i = 0; i < numRows; i++) {
Object storedValue = _rows.get(i)[_colId];
if (storedValue instanceof int[]) {
values[i] = (int[]) storedValue;
} else if (storedValue instanceof long[]) {
long[] longArray = (long[]) storedValue;
values[i] = new int[longArray.length];
ArrayCopyUtils.copy(longArray, values[i], longArray.length);
} else if (storedValue instanceof float[]) {
float[] floatArray = (float[]) storedValue;
values[i] = new int[floatArray.length];
ArrayCopyUtils.copy(floatArray, values[i], floatArray.length);
} else if (storedValue instanceof double[]) {
double[] doubleArray = (double[]) storedValue;
values[i] = new int[doubleArray.length];
ArrayCopyUtils.copy(doubleArray, values[i], doubleArray.length);
} else if (storedValue instanceof String[]) {
String[] stringArray = (String[]) storedValue;
values[i] = new int[stringArray.length];
for (int j = 0; j < stringArray.length; j++) {
values[i][j] = Integer.parseInt(stringArray[j]);
}
} else {
throw new IllegalStateException("Unsupported data type: " + storedValue.getClass().getName());
}
}
return values;
}

@Override
public long[][] getLongValuesMV() {
throw new UnsupportedOperationException();
int numRows = _rows.size();
long[][] values = new long[numRows][];
if (numRows == 0) {
return values;
}
if (_dataType == DataType.UNKNOWN) {
Arrays.fill(values, new long[0]);
return values;
}
for (int i = 0; i < numRows; i++) {
Object storedValue = _rows.get(i)[_colId];
if (storedValue instanceof int[]) {
int[] intArray = (int[]) storedValue;
values[i] = new long[intArray.length];
ArrayCopyUtils.copy(intArray, values[i], intArray.length);
} else if (storedValue instanceof long[]) {
values[i] = (long[]) storedValue;
} else if (storedValue instanceof float[]) {
float[] floatArray = (float[]) storedValue;
values[i] = new long[floatArray.length];
ArrayCopyUtils.copy(floatArray, values[i], floatArray.length);
} else if (storedValue instanceof double[]) {
double[] doubleArray = (double[]) storedValue;
values[i] = new long[doubleArray.length];
ArrayCopyUtils.copy(doubleArray, values[i], doubleArray.length);
} else if (storedValue instanceof String[]) {
String[] stringArray = (String[]) storedValue;
values[i] = new long[stringArray.length];
for (int j = 0; j < stringArray.length; j++) {
values[i][j] = Long.parseLong(stringArray[j]);
}
} else {
throw new IllegalStateException("Unsupported data type: " + storedValue.getClass().getName());
}
}
return values;
}

@Override
public float[][] getFloatValuesMV() {
throw new UnsupportedOperationException();
int numRows = _rows.size();
float[][] values = new float[numRows][];
if (numRows == 0) {
return values;
}
if (_dataType == DataType.UNKNOWN) {
Arrays.fill(values, new float[0]);
return values;
}
for (int i = 0; i < numRows; i++) {
Object storedValue = _rows.get(i)[_colId];
if (storedValue instanceof int[]) {
int[] intArray = (int[]) storedValue;
values[i] = new float[intArray.length];
ArrayCopyUtils.copy(intArray, values[i], intArray.length);
} else if (storedValue instanceof long[]) {
long[] longArray = (long[]) storedValue;
values[i] = new float[longArray.length];
ArrayCopyUtils.copy(longArray, values[i], longArray.length);
} else if (storedValue instanceof float[]) {
values[i] = (float[]) storedValue;
} else if (storedValue instanceof double[]) {
double[] doubleArray = (double[]) storedValue;
values[i] = new float[doubleArray.length];
ArrayCopyUtils.copy(doubleArray, values[i], doubleArray.length);
} else if (storedValue instanceof String[]) {
String[] stringArray = (String[]) storedValue;
values[i] = new float[stringArray.length];
for (int j = 0; j < stringArray.length; j++) {
values[i][j] = Float.parseFloat(stringArray[j]);
}
} else {
throw new IllegalStateException("Unsupported data type: " + storedValue.getClass().getName());
}
}
return values;
}

@Override
public double[][] getDoubleValuesMV() {
throw new UnsupportedOperationException();
int numRows = _rows.size();
double[][] values = new double[numRows][];
if (numRows == 0) {
return values;
}
if (_dataType == DataType.UNKNOWN) {
Arrays.fill(values, new double[0]);
return values;
}
for (int i = 0; i < numRows; i++) {
Object storedValue = _rows.get(i)[_colId];
if (storedValue instanceof int[]) {
int[] intArray = (int[]) storedValue;
values[i] = new double[intArray.length];
ArrayCopyUtils.copy(intArray, values[i], intArray.length);
} else if (storedValue instanceof long[]) {
long[] longArray = (long[]) storedValue;
values[i] = new double[longArray.length];
ArrayCopyUtils.copy(longArray, values[i], longArray.length);
} else if (storedValue instanceof float[]) {
float[] floatArray = (float[]) storedValue;
values[i] = new double[floatArray.length];
ArrayCopyUtils.copy(floatArray, values[i], floatArray.length);
} else if (storedValue instanceof double[]) {
values[i] = (double[]) storedValue;
} else if (storedValue instanceof String[]) {
String[] stringArray = (String[]) storedValue;
values[i] = new double[stringArray.length];
for (int j = 0; j < stringArray.length; j++) {
values[i][j] = Double.parseDouble(stringArray[j]);
}
} else {
throw new IllegalStateException("Unsupported data type: " + storedValue.getClass().getName());
}
}
return values;
}

@Override
public String[][] getStringValuesMV() {
throw new UnsupportedOperationException();
int numRows = _rows.size();
String[][] values = new String[numRows][];
if (numRows == 0) {
return values;
}
if (_dataType == DataType.UNKNOWN) {
Arrays.fill(values, new String[0]);
return values;
}
for (int i = 0; i < numRows; i++) {
Object storedValue = _rows.get(i)[_colId];
if (storedValue instanceof int[]) {
int[] intArray = (int[]) storedValue;
values[i] = new String[intArray.length];
for (int j = 0; j < intArray.length; j++) {
values[i][j] = Integer.toString(intArray[j]);
}
} else if (storedValue instanceof long[]) {
long[] longArray = (long[]) storedValue;
values[i] = new String[longArray.length];
for (int j = 0; j < longArray.length; j++) {
values[i][j] = Long.toString(longArray[j]);
}
} else if (storedValue instanceof float[]) {
float[] floatArray = (float[]) storedValue;
values[i] = new String[floatArray.length];
for (int j = 0; j < floatArray.length; j++) {
values[i][j] = Float.toString(floatArray[j]);
}
} else if (storedValue instanceof double[]) {
double[] doubleArray = (double[]) storedValue;
values[i] = new String[doubleArray.length];
for (int j = 0; j < doubleArray.length; j++) {
values[i][j] = Double.toString(doubleArray[j]);
}
} else if (storedValue instanceof String[]) {
values[i] = (String[]) storedValue;
} else {
throw new IllegalStateException("Unsupported data type: " + storedValue.getClass().getName());
}
}
return values;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.pinot.core.query.aggregation.function.array.ArrayAggStringFunction;
import org.apache.pinot.core.query.aggregation.function.array.ListAggDistinctFunction;
import org.apache.pinot.core.query.aggregation.function.array.ListAggFunction;
import org.apache.pinot.core.query.aggregation.function.array.SumArrayDoubleAggregationFunction;
import org.apache.pinot.core.query.aggregation.function.array.SumArrayLongAggregationFunction;
import org.apache.pinot.core.query.aggregation.function.funnel.FunnelCountAggregationFunctionFactory;
import org.apache.pinot.core.query.aggregation.function.funnel.window.FunnelCompleteCountAggregationFunction;
import org.apache.pinot.core.query.aggregation.function.funnel.window.FunnelMatchStepAggregationFunction;
Expand Down Expand Up @@ -269,6 +271,10 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
}
return new ListAggFunction(arguments.get(0), separator, nullHandlingEnabled);
}
case SUMARRAYLONG:
return new SumArrayLongAggregationFunction(arguments);
case SUMARRAYDOUBLE:
return new SumArrayDoubleAggregationFunction(arguments);
case ARRAYAGG: {
Preconditions.checkArgument(numArguments >= 2,
"ARRAY_AGG expects 2 or 3 arguments, got: %s. The function can be used as "
Expand Down
Loading