Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
*/
package org.apache.pinot.core.common;

import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nonnull;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.segment.spi.evaluator.TransformEvaluator;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.utils.EqualityUtils;


/**
Expand All @@ -39,12 +40,12 @@ public class DataBlockCache {

// Mark whether data have been fetched, need to be cleared in initNewBlock()
private final Set<String> _columnDictIdLoaded = new HashSet<>();
private final Map<FieldSpec.DataType, Set<String>> _columnValueLoaded = new EnumMap<>(FieldSpec.DataType.class);
private final Set<ColumnTypePair> _columnValueLoaded = new HashSet<>();
private final Set<String> _columnNumValuesLoaded = new HashSet<>();

// Buffer for data
private final Map<String, Object> _dictIdsMap = new HashMap<>();
private final Map<FieldSpec.DataType, Map<String, Object>> _valuesMap = new HashMap<>();
private final Map<ColumnTypePair, Object> _valuesMap = new HashMap<>();
private final Map<String, int[]> _numValuesMap = new HashMap<>();

private int[] _docIds;
Expand All @@ -64,10 +65,9 @@ public DataBlockCache(DataFetcher dataFetcher) {
public void initNewBlock(int[] docIds, int length) {
_docIds = docIds;
_length = length;

_columnDictIdLoaded.clear();
for (Set<String> columns : _columnValueLoaded.values()) {
columns.clear();
}
_columnValueLoaded.clear();
_columnNumValuesLoaded.clear();
}

Expand Down Expand Up @@ -109,11 +109,12 @@ public int[] getDictIdsForSVColumn(String column) {
* @return Array of int values
*/
public int[] getIntValuesForSVColumn(String column) {
int[] intValues = getValues(FieldSpec.DataType.INT, column);
if (markLoaded(FieldSpec.DataType.INT, column)) {
ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.INT);
int[] intValues = (int[]) _valuesMap.get(key);
if (_columnValueLoaded.add(key)) {
if (intValues == null) {
intValues = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
putValues(FieldSpec.DataType.INT, column, intValues);
_valuesMap.put(key, intValues);
}
_dataFetcher.fetchIntValues(column, _docIds, _length, intValues);
}
Expand All @@ -138,11 +139,12 @@ public void fillValues(String column, TransformEvaluator evaluator, int[] buffer
* @return Array of long values
*/
public long[] getLongValuesForSVColumn(String column) {
long[] longValues = getValues(FieldSpec.DataType.LONG, column);
if (markLoaded(FieldSpec.DataType.LONG, column)) {
ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.LONG);
long[] longValues = (long[]) _valuesMap.get(key);
if (_columnValueLoaded.add(key)) {
if (longValues == null) {
longValues = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL];
putValues(FieldSpec.DataType.LONG, column, longValues);
_valuesMap.put(key, longValues);
}
_dataFetcher.fetchLongValues(column, _docIds, _length, longValues);
}
Expand All @@ -167,11 +169,12 @@ public void fillValues(String column, TransformEvaluator evaluator, long[] buffe
* @return Array of float values
*/
public float[] getFloatValuesForSVColumn(String column) {
float[] floatValues = getValues(FieldSpec.DataType.FLOAT, column);
if (markLoaded(FieldSpec.DataType.FLOAT, column)) {
ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.FLOAT);
float[] floatValues = (float[]) _valuesMap.get(key);
if (_columnValueLoaded.add(key)) {
if (floatValues == null) {
floatValues = new float[DocIdSetPlanNode.MAX_DOC_PER_CALL];
putValues(FieldSpec.DataType.FLOAT, column, floatValues);
_valuesMap.put(key, floatValues);
}
_dataFetcher.fetchFloatValues(column, _docIds, _length, floatValues);
}
Expand All @@ -196,11 +199,12 @@ public void fillValues(String column, TransformEvaluator evaluator, float[] buff
* @return Array of double values
*/
public double[] getDoubleValuesForSVColumn(String column) {
double[] doubleValues = getValues(FieldSpec.DataType.DOUBLE, column);
if (markLoaded(FieldSpec.DataType.DOUBLE, column)) {
ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.DOUBLE);
double[] doubleValues = (double[]) _valuesMap.get(key);
if (_columnValueLoaded.add(key)) {
if (doubleValues == null) {
doubleValues = new double[DocIdSetPlanNode.MAX_DOC_PER_CALL];
putValues(FieldSpec.DataType.DOUBLE, column, doubleValues);
_valuesMap.put(key, doubleValues);
}
_dataFetcher.fetchDoubleValues(column, _docIds, _length, doubleValues);
}
Expand All @@ -225,11 +229,12 @@ public void fillValues(String column, TransformEvaluator evaluator, double[] buf
* @return Array of string values
*/
public String[] getStringValuesForSVColumn(String column) {
String[] stringValues = getValues(FieldSpec.DataType.STRING, column);
if (markLoaded(FieldSpec.DataType.STRING, column)) {
ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.STRING);
String[] stringValues = (String[]) _valuesMap.get(key);
if (_columnValueLoaded.add(key)) {
if (stringValues == null) {
stringValues = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL];
putValues(FieldSpec.DataType.STRING, column, stringValues);
_valuesMap.put(key, stringValues);
}
_dataFetcher.fetchStringValues(column, _docIds, _length, stringValues);
}
Expand All @@ -254,11 +259,13 @@ public void fillValues(String column, TransformEvaluator evaluator, String[] buf
* @return byte[] for the column
*/
public byte[][] getBytesValuesForSVColumn(String column) {
byte[][] bytesValues = getValues(FieldSpec.DataType.BYTES, column);
if (markLoaded(FieldSpec.DataType.BYTES, column)) {
ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.BYTES);
byte[][] bytesValues = (byte[][]) _valuesMap.get(key);

if (_columnValueLoaded.add(key)) {
if (bytesValues == null) {
bytesValues = new byte[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
putValues(FieldSpec.DataType.BYTES, column, bytesValues);
_valuesMap.put(key, bytesValues);
}
_dataFetcher.fetchBytesValues(column, _docIds, _length, bytesValues);
}
Expand Down Expand Up @@ -294,11 +301,12 @@ public int[][] getDictIdsForMVColumn(String column) {
* @return Array of int values
*/
public int[][] getIntValuesForMVColumn(String column) {
int[][] intValues = getValues(FieldSpec.DataType.INT, column);
if (markLoaded(FieldSpec.DataType.INT, column)) {
ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.INT);
int[][] intValues = (int[][]) _valuesMap.get(key);
if (_columnValueLoaded.add(key)) {
if (intValues == null) {
intValues = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
putValues(FieldSpec.DataType.INT, column, intValues);
_valuesMap.put(key, intValues);
}
_dataFetcher.fetchIntValues(column, _docIds, _length, intValues);
}
Expand All @@ -323,11 +331,12 @@ public void fillValues(String column, TransformEvaluator evaluator, int[][] buff
* @return Array of long values
*/
public long[][] getLongValuesForMVColumn(String column) {
long[][] longValues = getValues(FieldSpec.DataType.LONG, column);
if (markLoaded(FieldSpec.DataType.LONG, column)) {
ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.LONG);
long[][] longValues = (long[][]) _valuesMap.get(key);
if (_columnValueLoaded.add(key)) {
if (longValues == null) {
longValues = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
putValues(FieldSpec.DataType.LONG, column, longValues);
_valuesMap.put(key, longValues);
}
_dataFetcher.fetchLongValues(column, _docIds, _length, longValues);
}
Expand All @@ -352,11 +361,12 @@ public void fillValues(String column, TransformEvaluator evaluator, long[][] buf
* @return Array of float values
*/
public float[][] getFloatValuesForMVColumn(String column) {
float[][] floatValues = getValues(FieldSpec.DataType.FLOAT, column);
if (markLoaded(FieldSpec.DataType.FLOAT, column)) {
ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.FLOAT);
float[][] floatValues = (float[][]) _valuesMap.get(key);
if (_columnValueLoaded.add(key)) {
if (floatValues == null) {
floatValues = new float[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
putValues(FieldSpec.DataType.FLOAT, column, floatValues);
_valuesMap.put(key, floatValues);
}
_dataFetcher.fetchFloatValues(column, _docIds, _length, floatValues);
}
Expand All @@ -381,11 +391,12 @@ public void fillValues(String column, TransformEvaluator evaluator, float[][] bu
* @return Array of double values
*/
public double[][] getDoubleValuesForMVColumn(String column) {
double[][] doubleValues = getValues(FieldSpec.DataType.DOUBLE, column);
if (markLoaded(FieldSpec.DataType.DOUBLE, column)) {
ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.DOUBLE);
double[][] doubleValues = (double[][]) _valuesMap.get(key);
if (_columnValueLoaded.add(key)) {
if (doubleValues == null) {
doubleValues = new double[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
putValues(FieldSpec.DataType.DOUBLE, column, doubleValues);
_valuesMap.put(key, doubleValues);
}
_dataFetcher.fetchDoubleValues(column, _docIds, _length, doubleValues);
}
Expand All @@ -410,11 +421,12 @@ public void fillValues(String column, TransformEvaluator evaluator, double[][] b
* @return Array of string values
*/
public String[][] getStringValuesForMVColumn(String column) {
String[][] stringValues = getValues(FieldSpec.DataType.STRING, column);
if (markLoaded(FieldSpec.DataType.STRING, column)) {
ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.STRING);
String[][] stringValues = (String[][]) _valuesMap.get(key);
if (_columnValueLoaded.add(key)) {
if (stringValues == null) {
stringValues = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
putValues(FieldSpec.DataType.STRING, column, stringValues);
_valuesMap.put(key, stringValues);
}
_dataFetcher.fetchStringValues(column, _docIds, _length, stringValues);
}
Expand Down Expand Up @@ -450,16 +462,28 @@ public int[] getNumValuesForMVColumn(String column) {
return numValues;
}

private boolean markLoaded(FieldSpec.DataType dataType, String column) {
return _columnValueLoaded.computeIfAbsent(dataType, k -> new HashSet<>()).add(column);
}
/**
* Helper class to store pair of column name and data type.
*/
private static class ColumnTypePair {
final String _column;
final FieldSpec.DataType _dataType;

@SuppressWarnings("unchecked")
private <T> T getValues(FieldSpec.DataType dataType, String column) {
return (T) _valuesMap.computeIfAbsent(dataType, k -> new HashMap<>()).get(column);
}
ColumnTypePair(@Nonnull String column, @Nonnull FieldSpec.DataType dataType) {
_column = column;
_dataType = dataType;
}

private void putValues(FieldSpec.DataType dataType, String column, Object values) {
_valuesMap.get(dataType).put(column, values);
@Override
public int hashCode() {
return EqualityUtils.hashCodeOf(_column.hashCode(), _dataType.hashCode());
}

@SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
@Override
public boolean equals(Object obj) {
ColumnTypePair that = (ColumnTypePair) obj;
return _column.equals(that._column) && _dataType == that._dataType;
}
}
}