Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
*/
package org.apache.pinot.core.common;

import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nonnull;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.segment.spi.evaluator.TransformEvaluator;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.utils.EqualityUtils;


/**
Expand All @@ -40,12 +39,12 @@ public class DataBlockCache {

// Mark whether data have been fetched, need to be cleared in initNewBlock()
private final Set<String> _columnDictIdLoaded = new HashSet<>();
private final Set<ColumnTypePair> _columnValueLoaded = new HashSet<>();
private final Map<FieldSpec.DataType, Set<String>> _columnValueLoaded = new EnumMap<>(FieldSpec.DataType.class);
private final Set<String> _columnNumValuesLoaded = new HashSet<>();

// Buffer for data
private final Map<String, Object> _dictIdsMap = new HashMap<>();
private final Map<ColumnTypePair, Object> _valuesMap = new HashMap<>();
private final Map<FieldSpec.DataType, Map<String, Object>> _valuesMap = new HashMap<>();
private final Map<String, int[]> _numValuesMap = new HashMap<>();

private int[] _docIds;
Expand All @@ -65,9 +64,10 @@ public DataBlockCache(DataFetcher dataFetcher) {
public void initNewBlock(int[] docIds, int length) {
_docIds = docIds;
_length = length;

_columnDictIdLoaded.clear();
_columnValueLoaded.clear();
for (Set<String> columns : _columnValueLoaded.values()) {
columns.clear();
}
_columnNumValuesLoaded.clear();
}

Expand Down Expand Up @@ -109,12 +109,11 @@ public int[] getDictIdsForSVColumn(String column) {
* @return Array of int values
*/
public int[] getIntValuesForSVColumn(String column) {
ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.INT);
Copy link
Contributor

@siddharthteotia siddharthteotia Feb 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the potential problem being fixed here is the per call creation of ColumnTypePair object and that is leading to some heap/perf overhead ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, about 8MB of these were allocated in 1s in a benchmark which only allocated ~25MB/s. It's one of the main sources of allocation.

int[] intValues = (int[]) _valuesMap.get(key);
Copy link
Contributor

@siddharthteotia siddharthteotia Feb 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously there is a single level indirection to get the corresponding values[] for a given ColumnTypePar. Now it's a Map<Type,Map<String,Object>> plus another function call getValues().

So while we avoid the creation of ColumnTypePair object, is it possible that new code will add some perf overhead that will negate any benefit of this PR ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has all been measured, let me put together benchmark results (you can see the combined effect of the set of changes in #8134)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that an EnumMap lookup is an array access by ordinal, and the array is very small, so the cost of indirection here is very low.

if (_columnValueLoaded.add(key)) {
int[] intValues = getValues(FieldSpec.DataType.INT, column);
if (markLoaded(FieldSpec.DataType.INT, column)) {
if (intValues == null) {
intValues = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
_valuesMap.put(key, intValues);
putValues(FieldSpec.DataType.INT, column, intValues);
}
_dataFetcher.fetchIntValues(column, _docIds, _length, intValues);
}
Expand All @@ -139,12 +138,11 @@ public void fillValues(String column, TransformEvaluator evaluator, int[] buffer
* @return Array of long values
*/
public long[] getLongValuesForSVColumn(String column) {
ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.LONG);
long[] longValues = (long[]) _valuesMap.get(key);
if (_columnValueLoaded.add(key)) {
long[] longValues = getValues(FieldSpec.DataType.LONG, column);
if (markLoaded(FieldSpec.DataType.LONG, column)) {
if (longValues == null) {
longValues = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL];
_valuesMap.put(key, longValues);
putValues(FieldSpec.DataType.LONG, column, longValues);
}
_dataFetcher.fetchLongValues(column, _docIds, _length, longValues);
}
Expand All @@ -169,12 +167,11 @@ public void fillValues(String column, TransformEvaluator evaluator, long[] buffe
* @return Array of float values
*/
public float[] getFloatValuesForSVColumn(String column) {
ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.FLOAT);
float[] floatValues = (float[]) _valuesMap.get(key);
if (_columnValueLoaded.add(key)) {
float[] floatValues = getValues(FieldSpec.DataType.FLOAT, column);
if (markLoaded(FieldSpec.DataType.FLOAT, column)) {
if (floatValues == null) {
floatValues = new float[DocIdSetPlanNode.MAX_DOC_PER_CALL];
_valuesMap.put(key, floatValues);
putValues(FieldSpec.DataType.FLOAT, column, floatValues);
}
_dataFetcher.fetchFloatValues(column, _docIds, _length, floatValues);
}
Expand All @@ -199,12 +196,11 @@ public void fillValues(String column, TransformEvaluator evaluator, float[] buff
* @return Array of double values
*/
public double[] getDoubleValuesForSVColumn(String column) {
ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.DOUBLE);
double[] doubleValues = (double[]) _valuesMap.get(key);
if (_columnValueLoaded.add(key)) {
double[] doubleValues = getValues(FieldSpec.DataType.DOUBLE, column);
if (markLoaded(FieldSpec.DataType.DOUBLE, column)) {
if (doubleValues == null) {
doubleValues = new double[DocIdSetPlanNode.MAX_DOC_PER_CALL];
_valuesMap.put(key, doubleValues);
putValues(FieldSpec.DataType.DOUBLE, column, doubleValues);
}
_dataFetcher.fetchDoubleValues(column, _docIds, _length, doubleValues);
}
Expand All @@ -229,12 +225,11 @@ public void fillValues(String column, TransformEvaluator evaluator, double[] buf
* @return Array of string values
*/
public String[] getStringValuesForSVColumn(String column) {
ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.STRING);
String[] stringValues = (String[]) _valuesMap.get(key);
if (_columnValueLoaded.add(key)) {
String[] stringValues = getValues(FieldSpec.DataType.STRING, column);
if (markLoaded(FieldSpec.DataType.STRING, column)) {
if (stringValues == null) {
stringValues = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL];
_valuesMap.put(key, stringValues);
putValues(FieldSpec.DataType.STRING, column, stringValues);
}
_dataFetcher.fetchStringValues(column, _docIds, _length, stringValues);
}
Expand All @@ -259,13 +254,11 @@ public void fillValues(String column, TransformEvaluator evaluator, String[] buf
* @return byte[] for the column
*/
public byte[][] getBytesValuesForSVColumn(String column) {
ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.BYTES);
byte[][] bytesValues = (byte[][]) _valuesMap.get(key);

if (_columnValueLoaded.add(key)) {
byte[][] bytesValues = getValues(FieldSpec.DataType.BYTES, column);
if (markLoaded(FieldSpec.DataType.BYTES, column)) {
if (bytesValues == null) {
bytesValues = new byte[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
_valuesMap.put(key, bytesValues);
putValues(FieldSpec.DataType.BYTES, column, bytesValues);
}
_dataFetcher.fetchBytesValues(column, _docIds, _length, bytesValues);
}
Expand Down Expand Up @@ -301,12 +294,11 @@ public int[][] getDictIdsForMVColumn(String column) {
* @return Array of int values
*/
public int[][] getIntValuesForMVColumn(String column) {
ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.INT);
int[][] intValues = (int[][]) _valuesMap.get(key);
if (_columnValueLoaded.add(key)) {
int[][] intValues = getValues(FieldSpec.DataType.INT, column);
if (markLoaded(FieldSpec.DataType.INT, column)) {
if (intValues == null) {
intValues = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
_valuesMap.put(key, intValues);
putValues(FieldSpec.DataType.INT, column, intValues);
}
_dataFetcher.fetchIntValues(column, _docIds, _length, intValues);
}
Expand All @@ -331,12 +323,11 @@ public void fillValues(String column, TransformEvaluator evaluator, int[][] buff
* @return Array of long values
*/
public long[][] getLongValuesForMVColumn(String column) {
ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.LONG);
long[][] longValues = (long[][]) _valuesMap.get(key);
if (_columnValueLoaded.add(key)) {
long[][] longValues = getValues(FieldSpec.DataType.LONG, column);
if (markLoaded(FieldSpec.DataType.LONG, column)) {
if (longValues == null) {
longValues = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
_valuesMap.put(key, longValues);
putValues(FieldSpec.DataType.LONG, column, longValues);
}
_dataFetcher.fetchLongValues(column, _docIds, _length, longValues);
}
Expand All @@ -361,12 +352,11 @@ public void fillValues(String column, TransformEvaluator evaluator, long[][] buf
* @return Array of float values
*/
public float[][] getFloatValuesForMVColumn(String column) {
ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.FLOAT);
float[][] floatValues = (float[][]) _valuesMap.get(key);
if (_columnValueLoaded.add(key)) {
float[][] floatValues = getValues(FieldSpec.DataType.FLOAT, column);
if (markLoaded(FieldSpec.DataType.FLOAT, column)) {
if (floatValues == null) {
floatValues = new float[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
_valuesMap.put(key, floatValues);
putValues(FieldSpec.DataType.FLOAT, column, floatValues);
}
_dataFetcher.fetchFloatValues(column, _docIds, _length, floatValues);
}
Expand All @@ -391,12 +381,11 @@ public void fillValues(String column, TransformEvaluator evaluator, float[][] bu
* @return Array of double values
*/
public double[][] getDoubleValuesForMVColumn(String column) {
ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.DOUBLE);
double[][] doubleValues = (double[][]) _valuesMap.get(key);
if (_columnValueLoaded.add(key)) {
double[][] doubleValues = getValues(FieldSpec.DataType.DOUBLE, column);
if (markLoaded(FieldSpec.DataType.DOUBLE, column)) {
if (doubleValues == null) {
doubleValues = new double[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
_valuesMap.put(key, doubleValues);
putValues(FieldSpec.DataType.DOUBLE, column, doubleValues);
}
_dataFetcher.fetchDoubleValues(column, _docIds, _length, doubleValues);
}
Expand All @@ -421,12 +410,11 @@ public void fillValues(String column, TransformEvaluator evaluator, double[][] b
* @return Array of string values
*/
public String[][] getStringValuesForMVColumn(String column) {
ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.STRING);
String[][] stringValues = (String[][]) _valuesMap.get(key);
if (_columnValueLoaded.add(key)) {
String[][] stringValues = getValues(FieldSpec.DataType.STRING, column);
if (markLoaded(FieldSpec.DataType.STRING, column)) {
if (stringValues == null) {
stringValues = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
_valuesMap.put(key, stringValues);
putValues(FieldSpec.DataType.STRING, column, stringValues);
}
_dataFetcher.fetchStringValues(column, _docIds, _length, stringValues);
}
Expand Down Expand Up @@ -462,28 +450,16 @@ public int[] getNumValuesForMVColumn(String column) {
return numValues;
}

/**
* Helper class to store pair of column name and data type.
*/
private static class ColumnTypePair {
final String _column;
final FieldSpec.DataType _dataType;

ColumnTypePair(@Nonnull String column, @Nonnull FieldSpec.DataType dataType) {
_column = column;
_dataType = dataType;
}
private boolean markLoaded(FieldSpec.DataType dataType, String column) {
return _columnValueLoaded.computeIfAbsent(dataType, k -> new HashSet<>()).add(column);
}

@Override
public int hashCode() {
return EqualityUtils.hashCodeOf(_column.hashCode(), _dataType.hashCode());
}
@SuppressWarnings("unchecked")
private <T> T getValues(FieldSpec.DataType dataType, String column) {
return (T) _valuesMap.computeIfAbsent(dataType, k -> new HashMap<>()).get(column);
}

@SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
@Override
public boolean equals(Object obj) {
ColumnTypePair that = (ColumnTypePair) obj;
return _column.equals(that._column) && _dataType == that._dataType;
}
private void putValues(FieldSpec.DataType dataType, String column, Object values) {
_valuesMap.get(dataType).put(column, values);
}
}