Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge common APIs for Dictionary #6176

Merged
merged 2 commits into from
Oct 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.core.realtime.impl.dictionary.BaseMutableDictionary;
import org.apache.pinot.core.segment.index.readers.MutableDictionary;
import org.apache.pinot.core.realtime.impl.dictionary.BaseOffHeapMutableDictionary;
import org.apache.pinot.core.realtime.impl.dictionary.MutableDictionaryFactory;
import org.apache.pinot.core.realtime.impl.forward.FixedByteMVMutableForwardIndex;
Expand Down Expand Up @@ -247,7 +247,7 @@ public long getLatestIngestionTimestamp() {
DataType dataType = fieldSpec.getDataType();
boolean isFixedWidthColumn = dataType.isFixedWidth();
MutableForwardIndex forwardIndex;
BaseMutableDictionary dictionary;
MutableDictionary dictionary;
if (isNoDictionaryColumn(noDictionaryColumns, invertedIndexColumns, fieldSpec, column)) {
// No dictionary column (always single-valued)
assert fieldSpec.isSingleValueField();
Expand Down Expand Up @@ -492,7 +492,7 @@ private void updateDictionary(GenericRow row) {
String column = entry.getKey();
IndexContainer indexContainer = entry.getValue();
Object value = row.getValue(column);
BaseMutableDictionary dictionary = indexContainer._dictionary;
MutableDictionary dictionary = indexContainer._dictionary;
if (dictionary != null) {
if (indexContainer._fieldSpec.isSingleValueField()) {
indexContainer._dictId = dictionary.index(value);
Expand Down Expand Up @@ -744,7 +744,7 @@ public GenericRow getRecord(int docId, GenericRow reuse) {
* Helper method to read the value for the given document id.
*/
private static Object getValue(int docId, MutableForwardIndex forwardIndex,
@Nullable BaseMutableDictionary dictionary, int maxNumMultiValues) {
@Nullable MutableDictionary dictionary, int maxNumMultiValues) {
if (dictionary != null) {
// Dictionary based
if (forwardIndex.isSingleValue()) {
Expand Down Expand Up @@ -856,7 +856,7 @@ public void destroy() {
*/
public int[] getSortedDocIdIterationOrderWithSortedColumn(String column) {
IndexContainer indexContainer = _indexContainerMap.get(column);
BaseMutableDictionary dictionary = indexContainer._dictionary;
MutableDictionary dictionary = indexContainer._dictionary;

// Sort all values in the dictionary
int numValues = dictionary.length();
Expand Down Expand Up @@ -1032,7 +1032,7 @@ private class IndexContainer implements Closeable {
final Set<Integer> _partitions;
final NumValuesInfo _numValuesInfo;
final MutableForwardIndex _forwardIndex;
final BaseMutableDictionary _dictionary;
final MutableDictionary _dictionary;
final RealtimeInvertedIndexReader _invertedIndex;
final InvertedIndexReader _rangeIndex;
final RealtimeLuceneTextIndexReader _textIndex;
Expand All @@ -1048,7 +1048,7 @@ private class IndexContainer implements Closeable {

IndexContainer(FieldSpec fieldSpec, @Nullable PartitionFunction partitionFunction,
@Nullable Set<Integer> partitions, NumValuesInfo numValuesInfo, MutableForwardIndex forwardIndex,
@Nullable BaseMutableDictionary dictionary, @Nullable RealtimeInvertedIndexReader invertedIndex,
@Nullable MutableDictionary dictionary, @Nullable RealtimeInvertedIndexReader invertedIndex,
@Nullable InvertedIndexReader rangeIndex, @Nullable RealtimeLuceneTextIndexReader textIndex,
@Nullable BloomFilterReader bloomFilter, @Nullable MutableNullValueVector nullValueVector) {
_fieldSpec = fieldSpec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.core.common.DataSource;
import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.OfflineDictionaryBasedRangePredicateEvaluator;
import org.apache.pinot.core.query.request.context.predicate.Predicate;


Expand Down Expand Up @@ -54,8 +53,7 @@ public static BaseFilterOperator getLeafFilterOperator(PredicateEvaluator predic
return new SortedIndexBasedFilterOperator(predicateEvaluator, dataSource, numDocs);
}
if (dataSource.getRangeIndex() != null) {
return new RangeIndexBasedFilterOperator((OfflineDictionaryBasedRangePredicateEvaluator) predicateEvaluator,
dataSource, numDocs);
return new RangeIndexBasedFilterOperator(predicateEvaluator, dataSource, numDocs);
}
return new ScanBasedFilterOperator(predicateEvaluator, dataSource, numDocs);
} else if (predicateType == Predicate.Type.REGEXP_LIKE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.FloatRawValueBasedRangePredicateEvaluator;
import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.IntRawValueBasedRangePredicateEvaluator;
import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.LongRawValueBasedRangePredicateEvaluator;
import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.OfflineDictionaryBasedRangePredicateEvaluator;
import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.SortedDictionaryBasedRangePredicateEvaluator;
import org.apache.pinot.core.segment.index.readers.RangeIndexReader;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
Expand Down Expand Up @@ -56,12 +56,12 @@ protected FilterBlock getNextBlock() {

int firstRangeId;
int lastRangeId;
if (_rangePredicateEvaluator instanceof OfflineDictionaryBasedRangePredicateEvaluator) {
if (_rangePredicateEvaluator instanceof SortedDictionaryBasedRangePredicateEvaluator) {
Copy link
Contributor

@chenboat chenboat Oct 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this change related to this API refactoring? I did not see any change to OfflineDictionaryBasedRangePredicateEvaluator in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We should determine the evaluator by whether the dictionary is sorted, instead of the instance of the dictionary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean I did not see any change to OfflineDictionaryBasedRangePredicateEvaluator nor SortedDictionaryBasedRangePredicateEvaluator in this PR. Is this an existing bug for range predicate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that is under the RangeIndexBasedFilterOperator

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Got it. You made changes in RangePredicateEvaluatorFactory.java.

firstRangeId = rangeIndexReader
.findRangeId(((OfflineDictionaryBasedRangePredicateEvaluator) _rangePredicateEvaluator).getStartDictId());
.findRangeId(((SortedDictionaryBasedRangePredicateEvaluator) _rangePredicateEvaluator).getStartDictId());
// NOTE: End dictionary id is exclusive in OfflineDictionaryBasedRangePredicateEvaluator.
lastRangeId = rangeIndexReader
.findRangeId(((OfflineDictionaryBasedRangePredicateEvaluator) _rangePredicateEvaluator).getEndDictId() - 1);
.findRangeId(((SortedDictionaryBasedRangePredicateEvaluator) _rangePredicateEvaluator).getEndDictId() - 1);
} else {
switch (_rangePredicateEvaluator.getDataType()) {
case INT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import java.util.List;
import org.apache.pinot.common.utils.Pairs.IntPair;
import org.apache.pinot.core.common.DataSource;
import org.apache.pinot.core.segment.index.readers.SortedIndexReader;
import org.apache.pinot.core.operator.blocks.FilterBlock;
import org.apache.pinot.core.operator.docidsets.SortedDocIdSet;
import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.OfflineDictionaryBasedRangePredicateEvaluator;
import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.SortedDictionaryBasedRangePredicateEvaluator;
import org.apache.pinot.core.segment.index.readers.SortedIndexReader;


public class SortedIndexBasedFilterOperator extends BaseFilterOperator {
Expand All @@ -54,10 +54,10 @@ protected FilterBlock getNextBlock() {
// - "Subtractive" operators (NEQ, NOT IN): Build up a list of non-matching docIdRanges with adjacent ones merged,
// then subtract them from the range of [0, numDocs) to get a list of matching docIdRanges.

if (_predicateEvaluator instanceof OfflineDictionaryBasedRangePredicateEvaluator) {
if (_predicateEvaluator instanceof SortedDictionaryBasedRangePredicateEvaluator) {
// For RANGE predicate, use start/end document id to construct a new document id range
OfflineDictionaryBasedRangePredicateEvaluator rangePredicateEvaluator =
(OfflineDictionaryBasedRangePredicateEvaluator) _predicateEvaluator;
SortedDictionaryBasedRangePredicateEvaluator rangePredicateEvaluator =
(SortedDictionaryBasedRangePredicateEvaluator) _predicateEvaluator;
int startDocId = _sortedIndexReader.getDocIds(rangePredicateEvaluator.getStartDictId()).getLeft();
// NOTE: End dictionary id is exclusive in OfflineDictionaryBasedRangePredicateEvaluator.
int endDocId = _sortedIndexReader.getDocIds(rangePredicateEvaluator.getEndDictId() - 1).getRight();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.pinot.core.query.request.context.predicate.Predicate;
import org.apache.pinot.core.query.request.context.predicate.RangePredicate;
import org.apache.pinot.core.realtime.impl.dictionary.BaseMutableDictionary;
import org.apache.pinot.core.segment.index.readers.BaseImmutableDictionary;
import org.apache.pinot.core.segment.index.readers.Dictionary;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.ByteArray;
Expand All @@ -46,11 +44,10 @@ private RangePredicateEvaluatorFactory() {
*/
public static BaseDictionaryBasedPredicateEvaluator newDictionaryBasedEvaluator(RangePredicate rangePredicate,
Dictionary dictionary, DataType dataType) {
if (dictionary instanceof BaseImmutableDictionary) {
return new OfflineDictionaryBasedRangePredicateEvaluator(rangePredicate, (BaseImmutableDictionary) dictionary);
if (dictionary.isSorted()) {
return new SortedDictionaryBasedRangePredicateEvaluator(rangePredicate, dictionary);
} else {
return new RealtimeDictionaryBasedRangePredicateEvaluator(rangePredicate, (BaseMutableDictionary) dictionary,
dataType);
return new UnsortedDictionaryBasedRangePredicateEvaluator(rangePredicate, dictionary, dataType);
}
}

Expand Down Expand Up @@ -81,14 +78,14 @@ public static BaseRawValueBasedPredicateEvaluator newRawValueBasedEvaluator(Rang
}
}

public static final class OfflineDictionaryBasedRangePredicateEvaluator extends BaseDictionaryBasedPredicateEvaluator {
public static final class SortedDictionaryBasedRangePredicateEvaluator extends BaseDictionaryBasedPredicateEvaluator {
final int _startDictId;
// Exclusive
final int _endDictId;
final int _numMatchingDictIds;
int[] _matchingDictIds;

OfflineDictionaryBasedRangePredicateEvaluator(RangePredicate rangePredicate, BaseImmutableDictionary dictionary) {
SortedDictionaryBasedRangePredicateEvaluator(RangePredicate rangePredicate, Dictionary dictionary) {
String lowerBound = rangePredicate.getLowerBound();
String upperBound = rangePredicate.getUpperBound();
boolean lowerInclusive = rangePredicate.isLowerInclusive();
Expand Down Expand Up @@ -170,19 +167,19 @@ public int[] getMatchingDictIds() {
}
}

private static final class RealtimeDictionaryBasedRangePredicateEvaluator extends BaseDictionaryBasedPredicateEvaluator {
private static final class UnsortedDictionaryBasedRangePredicateEvaluator extends BaseDictionaryBasedPredicateEvaluator {
// When the cardinality of the column is lower than this threshold, pre-calculate the matching dictionary ids;
// otherwise, fetch the value when evaluating each dictionary id.
// TODO: Tune this threshold
private static final int DICT_ID_SET_BASED_CARDINALITY_THRESHOLD = 1000;

final BaseMutableDictionary _dictionary;
final Dictionary _dictionary;
final DataType _dataType;
final boolean _dictIdSetBased;
final IntSet _matchingDictIdSet;
final BaseRawValueBasedPredicateEvaluator _rawValueBasedEvaluator;

RealtimeDictionaryBasedRangePredicateEvaluator(RangePredicate rangePredicate, BaseMutableDictionary dictionary,
UnsortedDictionaryBasedRangePredicateEvaluator(RangePredicate rangePredicate, Dictionary dictionary,
DataType dataType) {
_dictionary = dictionary;
_dataType = dataType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.pinot.core.common.DataSource;
import org.apache.pinot.core.common.DataSourceMetadata;
import org.apache.pinot.core.data.partition.PartitionFunction;
import org.apache.pinot.core.realtime.impl.dictionary.BaseMutableDictionary;
import org.apache.pinot.core.segment.creator.ColumnStatistics;
import org.apache.pinot.core.segment.index.readers.Dictionary;
import org.apache.pinot.core.segment.index.readers.MutableForwardIndex;
import org.apache.pinot.spi.data.FieldSpec;

Expand All @@ -36,32 +36,35 @@
public class RealtimeColumnStatistics implements ColumnStatistics {
private final DataSource _dataSource;
private final int[] _sortedDocIdIterationOrder;
private final BaseMutableDictionary _mutableDictionary;

// NOTE: For new added columns during the ingestion, this will be constant value dictionary instead of mutable
// dictionary.
private final Dictionary _dictionary;

public RealtimeColumnStatistics(DataSource dataSource, int[] sortedDocIdIterationOrder) {
_dataSource = dataSource;
_sortedDocIdIterationOrder = sortedDocIdIterationOrder;
_mutableDictionary = (BaseMutableDictionary) dataSource.getDictionary();
_dictionary = dataSource.getDictionary();
}

@Override
public Object getMinValue() {
return _mutableDictionary.getMinVal();
return _dictionary.getMinVal();
}

@Override
public Object getMaxValue() {
return _mutableDictionary.getMaxVal();
return _dictionary.getMaxVal();
}

@Override
public Object getUniqueValuesSet() {
return _mutableDictionary.getSortedValues();
return _dictionary.getSortedValues();
}

@Override
public int getCardinality() {
return _mutableDictionary.length();
return _dictionary.length();
}

@Override
Expand All @@ -71,15 +74,15 @@ public int getLengthOfShortestElement() {

// If this column is a string/bytes column, iterate over the dictionary to find the maximum length
FieldSpec.DataType dataType = _dataSource.getDataSourceMetadata().getDataType();
int length = _mutableDictionary.length();
int length = _dictionary.length();

if (dataType.equals(FieldSpec.DataType.STRING)) {
for (int i = 0; i < length; i++) {
minStringLength = Math.min(_mutableDictionary.getStringValue(i).length(), minStringLength);
minStringLength = Math.min(_dictionary.getStringValue(i).length(), minStringLength);
}
} else if (dataType.equals(FieldSpec.DataType.BYTES)) {
for (int i = 0; i < length; i++) {
minStringLength = Math.min(_mutableDictionary.getBytesValue(i).length, minStringLength);
minStringLength = Math.min(_dictionary.getBytesValue(i).length, minStringLength);
}
}

Expand All @@ -93,15 +96,15 @@ public int getLengthOfLargestElement() {

// If this column is a string/bytes column, iterate over the dictionary to find the maximum length
FieldSpec.DataType dataType = _dataSource.getDataSourceMetadata().getDataType();
int length = _mutableDictionary.length();
int length = _dictionary.length();

if (dataType.equals(FieldSpec.DataType.STRING)) {
for (int i = 0; i < length; i++) {
maximumStringLength = Math.max(_mutableDictionary.getStringValue(i).length(), maximumStringLength);
maximumStringLength = Math.max(_dictionary.getStringValue(i).length(), maximumStringLength);
}
} else if (dataType.equals(FieldSpec.DataType.BYTES)) {
for (int i = 0; i < length; i++) {
maximumStringLength = Math.max(_mutableDictionary.getBytesValue(i).length, maximumStringLength);
maximumStringLength = Math.max(_dictionary.getBytesValue(i).length, maximumStringLength);
}
}

Expand Down Expand Up @@ -130,7 +133,7 @@ public boolean isSorted() {
int previousDictId = mutableForwardIndex.getDictId(_sortedDocIdIterationOrder[0]);
for (int i = 1; i < numDocs; i++) {
int currentDictId = mutableForwardIndex.getDictId(_sortedDocIdIterationOrder[i]);
if (_mutableDictionary.compare(previousDictId, currentDictId) > 0) {
if (_dictionary.compare(previousDictId, currentDictId) > 0) {
return false;
}
previousDictId = currentDictId;
Expand All @@ -139,7 +142,7 @@ public boolean isSorted() {
int previousDictId = mutableForwardIndex.getDictId(0);
for (int i = 1; i < numDocs; i++) {
int currentDictId = mutableForwardIndex.getDictId(i);
if (_mutableDictionary.compare(previousDictId, currentDictId) > 0) {
if (_dictionary.compare(previousDictId, currentDictId) > 0) {
return false;
}
previousDictId = currentDictId;
Expand Down

This file was deleted.

Loading