Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import it.unimi.dsi.fastutil.floats.FloatArrayList;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -52,7 +55,9 @@ public class DataSchema {
private final ColumnDataType[] _columnDataTypes;
private ColumnDataType[] _storedColumnDataTypes;

/** Used by both Broker and Server to generate results for EXPLAIN PLAN queries. */
/**
* Used by both Broker and Server to generate results for EXPLAIN PLAN queries.
*/
public static final DataSchema EXPLAIN_RESULT_SCHEMA =
new DataSchema(new String[]{"Operator", "Operator_Id", "Parent_Id"}, new ColumnDataType[]{
ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.INT
Expand Down Expand Up @@ -425,19 +430,19 @@ public Serializable convert(Object value) {
case BYTES:
return ((ByteArray) value).getBytes();
case INT_ARRAY:
return (int[]) value;
return toIntArray(value);
case LONG_ARRAY:
return toLongArray(value);
case FLOAT_ARRAY:
return (float[]) value;
return toFloatArray(value);
case DOUBLE_ARRAY:
return toDoubleArray(value);
case STRING_ARRAY:
return (String[]) value;
return toStringArray(value);
case BOOLEAN_ARRAY:
return toBooleanArray((int[]) value);
return toBooleanArray(toIntArray(value));
case TIMESTAMP_ARRAY:
return toTimestampArray((long[]) value);
return toTimestampArray(toLongArray(value));
case BYTES_ARRAY:
return (byte[][]) value;
case UNKNOWN: // fall through
Expand Down Expand Up @@ -513,11 +518,31 @@ public Serializable convertAndFormat(Object value) {
}
}

private static int[] toIntArray(Object value) {
if (value instanceof int[]) {
return (int[]) value;
} else if (value instanceof IntArrayList) {
// For ArrayAggregationFunction
return ((IntArrayList) value).elements();
}
throw new IllegalStateException(String.format("Cannot convert: '%s' to int[]", value));
}

private static float[] toFloatArray(Object value) {
if (value instanceof float[]) {
return (float[]) value;
} else if (value instanceof FloatArrayList) {
// For ArrayAggregationFunction
return ((FloatArrayList) value).elements();
}
throw new IllegalStateException(String.format("Cannot convert: '%s' to float[]", value));
}

private static double[] toDoubleArray(Object value) {
if (value instanceof double[]) {
return (double[]) value;
} else if (value instanceof DoubleArrayList) {
// For HistogramAggregationFunction
// For HistogramAggregationFunction and ArrayAggregationFunction
return ((DoubleArrayList) value).elements();
} else if (value instanceof int[]) {
int[] intValues = (int[]) value;
Expand Down Expand Up @@ -550,7 +575,7 @@ private static long[] toLongArray(Object value) {
if (value instanceof long[]) {
return (long[]) value;
} else if (value instanceof LongArrayList) {
// For FunnelCountAggregationFunction
// For FunnelCountAggregationFunction and ArrayAggregationFunction
return ((LongArrayList) value).elements();
} else {
int[] intValues = (int[]) value;
Expand All @@ -563,6 +588,16 @@ private static long[] toLongArray(Object value) {
}
}

private static String[] toStringArray(Object value) {
if (value instanceof String[]) {
return (String[]) value;
} else if (value instanceof ObjectArrayList) {
// For ArrayAggregationFunction
return ((ObjectArrayList<String>) value).toArray(new String[0]);
}
throw new IllegalStateException(String.format("Cannot convert: '%s' to String[]", value));
}

private static boolean[] toBooleanArray(int[] intArray) {
int length = intArray.length;
boolean[] booleanArray = new boolean[length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,23 @@
import it.unimi.dsi.fastutil.doubles.DoubleSet;
import it.unimi.dsi.fastutil.floats.Float2LongMap;
import it.unimi.dsi.fastutil.floats.Float2LongOpenHashMap;
import it.unimi.dsi.fastutil.floats.FloatArrayList;
import it.unimi.dsi.fastutil.floats.FloatIterator;
import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
import it.unimi.dsi.fastutil.floats.FloatSet;
import it.unimi.dsi.fastutil.ints.Int2LongMap;
import it.unimi.dsi.fastutil.ints.Int2LongOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import it.unimi.dsi.fastutil.longs.Long2LongMap;
import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongIterator;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongSet;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
import it.unimi.dsi.fastutil.objects.ObjectSet;
import java.io.IOException;
Expand Down Expand Up @@ -144,7 +148,11 @@ public enum ObjectType {
FrequentStringsSketch(38),
FrequentLongsSketch(39),
HyperLogLogPlus(40),
CompressedProbabilisticCounting(41);
CompressedProbabilisticCounting(41),
IntArrayList(42),
LongArrayList(43),
FloatArrayList(44),
StringArrayList(45);

private final int _value;

Expand All @@ -165,8 +173,25 @@ public static ObjectType getObjectType(Object value) {
return ObjectType.Double;
} else if (value instanceof BigDecimal) {
return ObjectType.BigDecimal;
} else if (value instanceof IntArrayList) {
return ObjectType.IntArrayList;
} else if (value instanceof LongArrayList) {
return ObjectType.LongArrayList;
} else if (value instanceof FloatArrayList) {
return ObjectType.FloatArrayList;
} else if (value instanceof DoubleArrayList) {
return ObjectType.DoubleArrayList;
} else if (value instanceof ObjectArrayList) {
ObjectArrayList objectArrayList = (ObjectArrayList) value;
if (!objectArrayList.isEmpty()) {
Object next = objectArrayList.get(0);
if (next instanceof String) {
return ObjectType.StringArrayList;
}
throw new IllegalArgumentException(
"Unsupported type of value: " + next.getClass().getSimpleName());
}
return ObjectType.StringArrayList;
} else if (value instanceof AvgPair) {
return ObjectType.AvgPair;
} else if (value instanceof MinMaxRangePair) {
Expand Down Expand Up @@ -329,6 +354,99 @@ public Double deserialize(ByteBuffer byteBuffer) {
}
};

public static final ObjectSerDe<IntArrayList> INT_ARRAY_LIST_SER_DE = new ObjectSerDe<IntArrayList>() {

@Override
public byte[] serialize(IntArrayList intArrayList) {
int size = intArrayList.size();
byte[] bytes = new byte[Integer.BYTES + size * Integer.BYTES];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
byteBuffer.putInt(size);
int[] values = intArrayList.elements();
for (int i = 0; i < size; i++) {
byteBuffer.putInt(values[i]);
}
return bytes;
}

@Override
public IntArrayList deserialize(byte[] bytes) {
return deserialize(ByteBuffer.wrap(bytes));
}

@Override
public IntArrayList deserialize(ByteBuffer byteBuffer) {
int numValues = byteBuffer.getInt();
IntArrayList intArrayList = new IntArrayList(numValues);
for (int i = 0; i < numValues; i++) {
intArrayList.add(byteBuffer.getInt());
}
return intArrayList;
}
};

public static final ObjectSerDe<LongArrayList> LONG_ARRAY_LIST_SER_DE = new ObjectSerDe<LongArrayList>() {

@Override
public byte[] serialize(LongArrayList longArrayList) {
int size = longArrayList.size();
byte[] bytes = new byte[Integer.BYTES + size * Long.BYTES];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
byteBuffer.putInt(size);
long[] values = longArrayList.elements();
for (int i = 0; i < size; i++) {
byteBuffer.putLong(values[i]);
}
return bytes;
}

@Override
public LongArrayList deserialize(byte[] bytes) {
return deserialize(ByteBuffer.wrap(bytes));
}

@Override
public LongArrayList deserialize(ByteBuffer byteBuffer) {
int numValues = byteBuffer.getInt();
LongArrayList longArrayList = new LongArrayList(numValues);
for (int i = 0; i < numValues; i++) {
longArrayList.add(byteBuffer.getLong());
}
return longArrayList;
}
};

public static final ObjectSerDe<FloatArrayList> FLOAT_ARRAY_LIST_SER_DE = new ObjectSerDe<FloatArrayList>() {

@Override
public byte[] serialize(FloatArrayList floatArrayList) {
int size = floatArrayList.size();
byte[] bytes = new byte[Integer.BYTES + size * Float.BYTES];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
byteBuffer.putInt(size);
float[] values = floatArrayList.elements();
for (int i = 0; i < size; i++) {
byteBuffer.putFloat(values[i]);
}
return bytes;
}

@Override
public FloatArrayList deserialize(byte[] bytes) {
return deserialize(ByteBuffer.wrap(bytes));
}

@Override
public FloatArrayList deserialize(ByteBuffer byteBuffer) {
int numValues = byteBuffer.getInt();
FloatArrayList floatArrayList = new FloatArrayList(numValues);
for (int i = 0; i < numValues; i++) {
floatArrayList.add(byteBuffer.getFloat());
}
return floatArrayList;
}
};

public static final ObjectSerDe<DoubleArrayList> DOUBLE_ARRAY_LIST_SER_DE = new ObjectSerDe<DoubleArrayList>() {

@Override
Expand Down Expand Up @@ -360,6 +478,50 @@ public DoubleArrayList deserialize(ByteBuffer byteBuffer) {
}
};

public static final ObjectSerDe<ObjectArrayList> STRING_ARRAY_LIST_SER_DE =
new ObjectSerDe<ObjectArrayList>() {
@Override
public byte[] serialize(ObjectArrayList stringArrayList) {
int size = stringArrayList.size();
// Besides the value bytes, we store: size, length for each value
long bufferSize = (1 + (long) size) * Integer.BYTES;
byte[][] valueBytesArray = new byte[size][];
for (int index = 0; index < size; index++) {
Object value = stringArrayList.get(index);
byte[] valueBytes = value.toString().getBytes(UTF_8);
bufferSize += valueBytes.length;
valueBytesArray[index] = valueBytes;
}
Preconditions.checkState(bufferSize <= Integer.MAX_VALUE, "Buffer size exceeds 2GB");
byte[] bytes = new byte[(int) bufferSize];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
byteBuffer.putInt(size);
for (byte[] valueBytes : valueBytesArray) {
byteBuffer.putInt(valueBytes.length);
byteBuffer.put(valueBytes);
}
return bytes;
}

@Override
public ObjectArrayList deserialize(byte[] bytes) {
return deserialize(ByteBuffer.wrap(bytes));
}

@Override
public ObjectArrayList deserialize(ByteBuffer byteBuffer) {
int size = byteBuffer.getInt();
ObjectArrayList stringArrayList = new ObjectArrayList(size);
for (int i = 0; i < size; i++) {
int length = byteBuffer.getInt();
byte[] valueBytes = new byte[length];
byteBuffer.get(valueBytes);
stringArrayList.add(new String(valueBytes, UTF_8));
}
return stringArrayList;
}
};

public static final ObjectSerDe<AvgPair> AVG_PAIR_SER_DE = new ObjectSerDe<AvgPair>() {

@Override
Expand Down Expand Up @@ -1435,6 +1597,10 @@ public LongsSketch deserialize(ByteBuffer byteBuffer) {
FREQUENT_LONGS_SKETCH_SER_DE,
HYPER_LOG_LOG_PLUS_SER_DE,
DATA_SKETCH_CPC_SER_DE,
INT_ARRAY_LIST_SER_DE,
LONG_ARRAY_LIST_SER_DE,
FLOAT_ARRAY_LIST_SER_DE,
STRING_ARRAY_LIST_SER_DE,
};
//@formatter:on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
package org.apache.pinot.core.operator.blocks.results;

import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import it.unimi.dsi.fastutil.floats.FloatArrayList;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Collections;
Expand Down Expand Up @@ -191,12 +194,21 @@ private void setFinalResult(DataTableBuilder dataTableBuilder, ColumnDataType[]
case BYTES:
dataTableBuilder.setColumn(index, (ByteArray) result);
break;
case DOUBLE_ARRAY:
dataTableBuilder.setColumn(index, ((DoubleArrayList) result).elements());
case INT_ARRAY:
dataTableBuilder.setColumn(index, ((IntArrayList) result).elements());
break;
case LONG_ARRAY:
dataTableBuilder.setColumn(index, ((LongArrayList) result).elements());
break;
case FLOAT_ARRAY:
dataTableBuilder.setColumn(index, ((FloatArrayList) result).elements());
break;
case DOUBLE_ARRAY:
dataTableBuilder.setColumn(index, ((DoubleArrayList) result).elements());
break;
case STRING_ARRAY:
dataTableBuilder.setColumn(index, ((ObjectArrayList<String>) result).toArray(new String[0]));
break;
default:
throw new IllegalStateException("Illegal column data type in final result: " + columnDataType);
}
Expand Down
Loading