Skip to content

Commit

Permalink
Revert "Remove stripeDictionaryData buffer in SliceDictionarySelectiv…
Browse files Browse the repository at this point in the history
…eReader"

This reverts commit 0006741.
  • Loading branch information
kewang1024 authored and rongrong committed Oct 8, 2020
1 parent b8e72e8 commit 8c60697
Showing 1 changed file with 51 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import java.util.Optional;

import static com.facebook.presto.orc.array.Arrays.ExpansionFactor.MEDIUM;
import static com.facebook.presto.orc.array.Arrays.ExpansionOption.PRESERVE;
import static com.facebook.presto.orc.array.Arrays.ExpansionOption.NONE;
import static com.facebook.presto.orc.array.Arrays.ensureCapacity;
import static com.facebook.presto.orc.metadata.OrcType.OrcTypeKind.CHAR;
import static com.facebook.presto.orc.metadata.Stream.StreamKind.DATA;
Expand Down Expand Up @@ -89,8 +89,12 @@ public class SliceDictionarySelectiveReader
private final int maxCodePointCount;
private final boolean isCharType;

private byte[] dictionaryData = EMPTY_DICTIONARY_DATA;
private int[] dictionaryOffsetVector = EMPTY_DICTIONARY_OFFSETS;
private byte[] stripeDictionaryData = EMPTY_DICTIONARY_DATA;
private int[] stripeDictionaryOffsetVector = EMPTY_DICTIONARY_OFFSETS;
private byte[] rowGroupDictionaryData = EMPTY_DICTIONARY_DATA;
private int[] rowGroupDictionaryOffsetVector = EMPTY_DICTIONARY_OFFSETS;
private byte[] currentDictionaryData = EMPTY_DICTIONARY_DATA;
private int[] currentDictionaryOffsetVector;
private int[] stripeDictionaryLength = new int[0];
private int[] rowGroupDictionaryLength = new int[0];
private byte[] evaluationStatus;
Expand Down Expand Up @@ -298,15 +302,15 @@ private byte evaluateFilter(int position, int index, int length)
return FILTER_FAILED;
}

int currentLength = dictionaryOffsetVector[index + 1] - dictionaryOffsetVector[index];
int currentLength = currentDictionaryOffsetVector[index + 1] - currentDictionaryOffsetVector[index];
if (isCharType && length != currentLength) {
System.arraycopy(dictionaryData, dictionaryOffsetVector[index], valueWithPadding, 0, currentLength);
System.arraycopy(currentDictionaryData, currentDictionaryOffsetVector[index], valueWithPadding, 0, currentLength);
Arrays.fill(valueWithPadding, currentLength, length, (byte) ' ');
if (!filter.testBytes(valueWithPadding, 0, length)) {
return FILTER_FAILED;
}
}
else if (!filter.testBytes(dictionaryData, dictionaryOffsetVector[index], length)) {
else if (!filter.testBytes(currentDictionaryData, currentDictionaryOffsetVector[index], length)) {
return FILTER_FAILED;
}

Expand Down Expand Up @@ -440,8 +444,8 @@ private void wrapDictionaryIfNecessary()
boolean[] isNullVector = new boolean[currentDictionarySize];
isNullVector[currentDictionarySize - 1] = true;

byte[] dictionaryDataCopy = Arrays.copyOf(dictionaryData, dictionaryOffsetVector[currentDictionarySize]);
int[] dictionaryOffsetVectorCopy = Arrays.copyOf(dictionaryOffsetVector, currentDictionarySize + 1);
byte[] dictionaryDataCopy = Arrays.copyOf(currentDictionaryData, currentDictionaryOffsetVector[currentDictionarySize]);
int[] dictionaryOffsetVectorCopy = Arrays.copyOf(currentDictionaryOffsetVector, currentDictionarySize + 1);
dictionary = new VariableWidthBlock(currentDictionarySize, wrappedBuffer(dictionaryDataCopy), dictionaryOffsetVectorCopy, Optional.of(isNullVector));

dictionaryWrapped = true;
Expand Down Expand Up @@ -499,16 +503,16 @@ private void openRowGroup()
dataLength += stripeDictionaryLength[i];
}

dictionaryData = ensureCapacity(dictionaryData, toIntExact(dataLength));
dictionaryOffsetVector = ensureCapacity(dictionaryOffsetVector, stripeDictionarySize + 2);
stripeDictionaryData = ensureCapacity(stripeDictionaryData, toIntExact(dataLength));
stripeDictionaryOffsetVector = ensureCapacity(stripeDictionaryOffsetVector, stripeDictionarySize + 2);

// read dictionary values
ByteArrayInputStream dictionaryDataStream = stripeDictionaryDataStreamSource.openStream();
readDictionary(dictionaryDataStream, stripeDictionarySize, stripeDictionaryLength, 0, dictionaryData, dictionaryOffsetVector, maxCodePointCount, isCharType);
readDictionary(dictionaryDataStream, stripeDictionarySize, stripeDictionaryLength, 0, stripeDictionaryData, stripeDictionaryOffsetVector, maxCodePointCount, isCharType);
}
else {
dictionaryData = EMPTY_DICTIONARY_DATA;
dictionaryOffsetVector = EMPTY_DICTIONARY_OFFSETS;
stripeDictionaryData = EMPTY_DICTIONARY_DATA;
stripeDictionaryOffsetVector = EMPTY_DICTIONARY_OFFSETS;
}

// If there is no rowgroup dictionary, we only need to wrap the stripe dictionary once per stripe because wrapping dictionary is very expensive.
Expand All @@ -520,7 +524,10 @@ private void openRowGroup()
if (dictionaryLengthStream != null) {
int rowGroupDictionarySize = dictionaryLengthStream.getEntryCount();

rowGroupDictionaryLength = ensureCapacity(rowGroupDictionaryLength, rowGroupDictionarySize);
// resize the dictionary lengths array if necessary
if (rowGroupDictionaryLength.length < rowGroupDictionarySize) {
rowGroupDictionaryLength = new int[rowGroupDictionarySize];
}

// read the lengths
dictionaryLengthStream.nextIntVector(rowGroupDictionarySize, rowGroupDictionaryLength, 0);
Expand All @@ -529,33 +536,33 @@ private void openRowGroup()
dataLength += rowGroupDictionaryLength[i];
}

dictionaryData = ensureCapacity(
dictionaryData,
dictionaryOffsetVector[stripeDictionarySize] + toIntExact(dataLength),
rowGroupDictionaryData = ensureCapacity(
rowGroupDictionaryData,
stripeDictionaryOffsetVector[stripeDictionarySize] + toIntExact(dataLength),
MEDIUM,
PRESERVE);
NONE);

dictionaryOffsetVector = ensureCapacity(dictionaryOffsetVector,
rowGroupDictionaryOffsetVector = ensureCapacity(rowGroupDictionaryOffsetVector,
stripeDictionarySize + rowGroupDictionarySize + 2,
MEDIUM,
PRESERVE);
NONE);

if (!stripeDictionaryOpen) {
System.arraycopy(stripeDictionaryData, 0, rowGroupDictionaryData, 0, stripeDictionaryOffsetVector[stripeDictionarySize]);
System.arraycopy(stripeDictionaryOffsetVector, 0, rowGroupDictionaryOffsetVector, 0, stripeDictionarySize + 2);
}
dictionaryWrapped = false;

// read dictionary values
ByteArrayInputStream dictionaryDataStream = rowGroupDictionaryDataStreamSource.openStream();
readDictionary(dictionaryDataStream, rowGroupDictionarySize, rowGroupDictionaryLength, stripeDictionarySize, dictionaryData, dictionaryOffsetVector, maxCodePointCount, isCharType);
currentDictionarySize = stripeDictionarySize + rowGroupDictionarySize + 1;

initiateEvaluationStatus(stripeDictionarySize + rowGroupDictionarySize + 1);
readDictionary(dictionaryDataStream, rowGroupDictionarySize, rowGroupDictionaryLength, stripeDictionarySize, rowGroupDictionaryData, rowGroupDictionaryOffsetVector, maxCodePointCount, isCharType);
setDictionaryBlockData(rowGroupDictionaryData, rowGroupDictionaryOffsetVector, stripeDictionarySize + rowGroupDictionarySize + 1);
}
else {
// there is no row group dictionary so use the stripe dictionary
currentDictionarySize = stripeDictionarySize + 1;
initiateEvaluationStatus(stripeDictionarySize + 1);
setDictionaryBlockData(stripeDictionaryData, stripeDictionaryOffsetVector, stripeDictionarySize + 1);
}

dictionaryOffsetVector[currentDictionarySize] = dictionaryOffsetVector[currentDictionarySize - 1];
stripeDictionaryOpen = true;
presentStream = presentStreamSource.openStream();
inDictionaryStream = inDictionaryStreamSource.openStream();
Expand Down Expand Up @@ -655,9 +662,10 @@ public void startRowGroup(InputStreamSources dataStreamSources)
public void close()
{
dictionary = null;
dictionaryData = null;
dictionaryOffsetVector = null;
currentDictionaryData = null;
rowGroupDictionaryData = null;
rowGroupDictionaryLength = null;
stripeDictionaryData = null;
stripeDictionaryLength = null;
values = null;
outputPositions = null;
Expand All @@ -669,18 +677,29 @@ public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE +
sizeOf(values) +
sizeOf(dictionaryData) +
sizeOf(dictionaryOffsetVector) +
sizeOf(stripeDictionaryData) +
sizeOf(stripeDictionaryOffsetVector) +
sizeOf(stripeDictionaryLength) +
sizeOf(rowGroupDictionaryData) +
sizeOf(rowGroupDictionaryOffsetVector) +
sizeOf(rowGroupDictionaryLength) +
sizeOf(evaluationStatus) +
sizeOf(valueWithPadding) +
dictionary.getRetainedSizeInBytes();
}

private void initiateEvaluationStatus(int positionCount)
private void setDictionaryBlockData(byte[] dictionaryData, int[] dictionaryOffsets, int positionCount)
{
verify(positionCount > 0);
// only update the block if the array changed to prevent creation of new Block objects, since
// the engine currently uses identity equality to test if dictionaries are the same
if (currentDictionaryData != dictionaryData) {
currentDictionaryData = dictionaryData;
}
currentDictionaryOffsetVector = dictionaryOffsets;
currentDictionarySize = positionCount;
// The last element in the dictionary is null.
currentDictionaryOffsetVector[currentDictionarySize] = currentDictionaryOffsetVector[currentDictionarySize - 1];

evaluationStatus = ensureCapacity(evaluationStatus, positionCount - 1);
fill(evaluationStatus, 0, evaluationStatus.length, FILTER_NOT_EVALUATED);
Expand Down

0 comments on commit 8c60697

Please sign in to comment.