Skip to content

Commit

Permalink
Defer the creation of dictionary in SliceDictionarySelectiveReader
Browse files Browse the repository at this point in the history
Previously the dicitonary data byte arrays were allocated as soon as a
stripe or a rowgroup is open during read(). This commit persists them
as local buffers, and only copies them to create the dictionary until
getBlock() is called. This way the expensive allocations can be avoided
if a lazyBlock doesn't need to be read.

Moreover, this commit avoids creating Slices when evaluating the
filters, which was also another source of big  memory allocations.
  • Loading branch information
Ying Su authored and mbasmanova committed Sep 15, 2020
1 parent 93d1b04 commit 7994754
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.facebook.presto.common.block.DictionaryBlock;
import com.facebook.presto.common.block.RunLengthEncodedBlock;
import com.facebook.presto.common.block.VariableWidthBlock;
import com.facebook.presto.common.type.Chars;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.orc.OrcCorruptionException;
import com.facebook.presto.orc.OrcLocalMemoryContext;
Expand All @@ -37,12 +36,15 @@
import org.openjdk.jol.info.ClassLayout;

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;

import static com.facebook.presto.orc.array.Arrays.ExpansionFactor.MEDIUM;
import static com.facebook.presto.orc.array.Arrays.ExpansionOption.NONE;
import static com.facebook.presto.orc.array.Arrays.ensureCapacity;
import static com.facebook.presto.orc.metadata.OrcType.OrcTypeKind.CHAR;
import static com.facebook.presto.orc.metadata.Stream.StreamKind.DATA;
Expand Down Expand Up @@ -89,10 +91,14 @@ public class SliceDictionarySelectiveReader

private byte[] stripeDictionaryData = EMPTY_DICTIONARY_DATA;
private int[] stripeDictionaryOffsetVector = EMPTY_DICTIONARY_OFFSETS;
private byte[] rowGroupDictionaryData = EMPTY_DICTIONARY_DATA;
private int[] rowGroupDictionaryOffsetVector = EMPTY_DICTIONARY_OFFSETS;
private byte[] currentDictionaryData = EMPTY_DICTIONARY_DATA;
private int[] currentDictionaryOffsetVector;
private int[] stripeDictionaryLength = new int[0];
private int[] rowGroupDictionaryLength = new int[0];
private byte[] evaluationStatus;
private byte[] valueWithPadding;

private int readOffset;

Expand All @@ -106,7 +112,13 @@ public class SliceDictionarySelectiveReader
private InputStreamSource<ByteArrayInputStream> stripeDictionaryDataStreamSource = missingStreamSource(ByteArrayInputStream.class);
private InputStreamSource<LongInputStream> stripeDictionaryLengthStreamSource = missingStreamSource(LongInputStream.class);
private boolean stripeDictionaryOpen;
// The dictionaries will be wrapped in getBlock(). It's set to false when opening a new dictionary (be it stripe dictionary or rowgroup dictionary). When there is only stripe
// dictionary but no rowgroup dictionaries, we shall set it to false only when opening the stripe dictionary while not for every rowgroup. It is set to true when the dictionary
// is wrapped up in wrapDictionaryIfNecessary().
private boolean dictionaryWrapped;

private int stripeDictionarySize;
private int currentDictionarySize;

private InputStreamSource<ByteArrayInputStream> rowGroupDictionaryDataStreamSource = missingStreamSource(ByteArrayInputStream.class);
private InputStreamSource<BooleanInputStream> inDictionaryStreamSource = missingStreamSource(BooleanInputStream.class);
Expand Down Expand Up @@ -134,6 +146,7 @@ public SliceDictionarySelectiveReader(StreamDescriptor streamDescriptor, Optiona
this.outputType = requireNonNull(outputType, "outputType is null").orElse(null);
OrcType orcType = streamDescriptor.getOrcType();
this.maxCodePointCount = orcType == null ? 0 : orcType.getLength().orElse(-1);
this.valueWithPadding = maxCodePointCount < 0 ? null : new byte[maxCodePointCount];
this.isCharType = orcType.getOrcTypeKind() == CHAR;
this.outputRequired = outputType.isPresent();
checkArgument(filter.isPresent() || outputRequired, "filter must be present if outputRequired is false");
Expand Down Expand Up @@ -193,7 +206,7 @@ private int readNoFilter(int[] positions, int positionCount)
}

if (presentStream != null && !presentStream.nextBit()) {
values[i] = dictionary.getPositionCount() - 1;
values[i] = currentDictionarySize - 1;
}
else {
boolean isInRowDictionary = inDictionaryStream != null && !inDictionaryStream.nextBit();
Expand All @@ -220,7 +233,7 @@ private int readWithFilter(int[] positions, int positionCount)
if (presentStream != null && !presentStream.nextBit()) {
if ((nonDeterministicFilter && filter.testNull()) || nullsAllowed) {
if (outputRequired) {
values[outputPositionCount] = dictionary.getPositionCount() - 1;
values[outputPositionCount] = currentDictionarySize - 1;
}
outputPositions[outputPositionCount] = position;
outputPositionCount++;
Expand Down Expand Up @@ -285,22 +298,28 @@ private int readWithFilter(int[] positions, int positionCount)

private byte evaluateFilter(int position, int index, int length)
{
if (filter.testLength(length)) {
int currentLength = dictionary.getSliceLength(index);
Slice data = dictionary.getSlice(index, 0, currentLength);
if (isCharType && length != currentLength) {
data = Chars.padSpaces(data, maxCodePointCount);
}
if (filter.testBytes(data.getBytes(), 0, length)) {
if (outputRequired) {
values[outputPositionCount] = index;
}
outputPositions[outputPositionCount] = position;
outputPositionCount++;
return FILTER_PASSED;
if (!filter.testLength(length)) {
return FILTER_FAILED;
}

int currentLength = currentDictionaryOffsetVector[index + 1] - currentDictionaryOffsetVector[index];
if (isCharType && length != currentLength) {
System.arraycopy(currentDictionaryData, currentDictionaryOffsetVector[index], valueWithPadding, 0, currentLength);
Arrays.fill(valueWithPadding, currentLength, length, (byte) ' ');
if (!filter.testBytes(valueWithPadding, 0, length)) {
return FILTER_FAILED;
}
}
return FILTER_FAILED;
else if (!filter.testBytes(currentDictionaryData, currentDictionaryOffsetVector[index], length)) {
return FILTER_FAILED;
}

if (outputRequired) {
values[outputPositionCount] = index;
}
outputPositions[outputPositionCount] = position;
outputPositionCount++;
return FILTER_PASSED;
}

private int readAllNulls(int[] positions, int positionCount)
Expand Down Expand Up @@ -369,8 +388,11 @@ public Block getBlock(int[] positions, int positionCount)
return new RunLengthEncodedBlock(outputType.createBlockBuilder(null, 1).appendNull().build(), positionCount);
}

wrapDictionaryIfNecessary();

if (positionCount == outputPositionCount) {
DictionaryBlock block = new DictionaryBlock(positionCount, dictionary, values);

values = null;
return block;
}
Expand Down Expand Up @@ -409,9 +431,26 @@ public BlockLease getBlockView(int[] positions, int positionCount)
if (positionCount < outputPositionCount) {
compactValues(positions, positionCount);
}
wrapDictionaryIfNecessary();
return newLease(new DictionaryBlock(positionCount, dictionary, values));
}

private void wrapDictionaryIfNecessary()
{
if (dictionaryWrapped) {
return;
}

boolean[] isNullVector = new boolean[currentDictionarySize];
isNullVector[currentDictionarySize - 1] = true;

byte[] dictionaryDataCopy = Arrays.copyOf(currentDictionaryData, currentDictionaryOffsetVector[currentDictionarySize]);
int[] dictionaryOffsetVectorCopy = Arrays.copyOf(currentDictionaryOffsetVector, currentDictionarySize + 1);
dictionary = new VariableWidthBlock(currentDictionarySize, wrappedBuffer(dictionaryDataCopy), dictionaryOffsetVectorCopy, Optional.of(isNullVector));

dictionaryWrapped = true;
}

private void compactValues(int[] positions, int positionCount)
{
int positionIndex = 0;
Expand Down Expand Up @@ -464,10 +503,8 @@ private void openRowGroup()
dataLength += stripeDictionaryLength[i];
}

// we must always create a new dictionary array because the previous dictionary may still be referenced
stripeDictionaryData = new byte[toIntExact(dataLength)];
// add one extra entry for null
stripeDictionaryOffsetVector = new int[stripeDictionarySize + 2];
stripeDictionaryData = ensureCapacity(stripeDictionaryData, toIntExact(dataLength));
stripeDictionaryOffsetVector = ensureCapacity(stripeDictionaryOffsetVector, stripeDictionarySize + 2);

// read dictionary values
ByteArrayInputStream dictionaryDataStream = stripeDictionaryDataStreamSource.openStream();
Expand All @@ -477,8 +514,10 @@ private void openRowGroup()
stripeDictionaryData = EMPTY_DICTIONARY_DATA;
stripeDictionaryOffsetVector = EMPTY_DICTIONARY_OFFSETS;
}

// If there is no rowgroup dictionary, we only need to wrap the stripe dictionary once per stripe because wrapping dictionary is very expensive.
dictionaryWrapped = false;
}
stripeDictionaryOpen = true;

// read row group dictionary
RowGroupDictionaryLengthInputStream dictionaryLengthStream = rowGroupDictionaryLengthStreamSource.openStream();
Expand All @@ -497,10 +536,22 @@ private void openRowGroup()
dataLength += rowGroupDictionaryLength[i];
}

// We must always create a new dictionary array because the previous dictionary may still be referenced
// The first elements of the dictionary are from the stripe dictionary, then the row group dictionary elements, and then a null
byte[] rowGroupDictionaryData = java.util.Arrays.copyOf(stripeDictionaryData, stripeDictionaryOffsetVector[stripeDictionarySize] + toIntExact(dataLength));
int[] rowGroupDictionaryOffsetVector = Arrays.copyOf(stripeDictionaryOffsetVector, stripeDictionarySize + rowGroupDictionarySize + 2);
rowGroupDictionaryData = ensureCapacity(
rowGroupDictionaryData,
stripeDictionaryOffsetVector[stripeDictionarySize] + toIntExact(dataLength),
MEDIUM,
NONE);

rowGroupDictionaryOffsetVector = ensureCapacity(rowGroupDictionaryOffsetVector,
stripeDictionarySize + rowGroupDictionarySize + 2,
MEDIUM,
NONE);

if (!stripeDictionaryOpen) {
System.arraycopy(stripeDictionaryData, 0, rowGroupDictionaryData, 0, stripeDictionaryOffsetVector[stripeDictionarySize]);
System.arraycopy(stripeDictionaryOffsetVector, 0, rowGroupDictionaryOffsetVector, 0, stripeDictionarySize + 2);
}
dictionaryWrapped = false;

// read dictionary values
ByteArrayInputStream dictionaryDataStream = rowGroupDictionaryDataStreamSource.openStream();
Expand All @@ -512,6 +563,7 @@ private void openRowGroup()
setDictionaryBlockData(stripeDictionaryData, stripeDictionaryOffsetVector, stripeDictionarySize + 1);
}

stripeDictionaryOpen = true;
presentStream = presentStreamSource.openStream();
inDictionaryStream = inDictionaryStreamSource.openStream();
dataStream = dataStreamSource.openStream();
Expand Down Expand Up @@ -611,6 +663,7 @@ public void close()
{
dictionary = null;
currentDictionaryData = null;
rowGroupDictionaryData = null;
rowGroupDictionaryLength = null;
stripeDictionaryData = null;
stripeDictionaryLength = null;
Expand All @@ -624,15 +677,15 @@ public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE +
sizeOf(values) +
sizeOf(outputPositions) +
// dictionary could be built on stripeDictionaryData or the rowGroupDictionaryData created locally in openRowGroup(). For the first case, currentDictionaryData
// points to stripeDictionaryData and we just need to count the dictionary's retained size. For the second case, we need to count both stripeDictionaryData and dictionary.
dictionary.getRetainedSizeInBytes() +
(stripeDictionaryData == currentDictionaryData ? 0 : sizeOf(stripeDictionaryData)) +
sizeOf(stripeDictionaryData) +
sizeOf(stripeDictionaryOffsetVector) +
sizeOf(stripeDictionaryLength) +
sizeOf(rowGroupDictionaryData) +
sizeOf(rowGroupDictionaryOffsetVector) +
sizeOf(rowGroupDictionaryLength) +
sizeOf(evaluationStatus);
sizeOf(evaluationStatus) +
sizeOf(valueWithPadding) +
dictionary.getRetainedSizeInBytes();
}

private void setDictionaryBlockData(byte[] dictionaryData, int[] dictionaryOffsets, int positionCount)
Expand All @@ -641,14 +694,15 @@ private void setDictionaryBlockData(byte[] dictionaryData, int[] dictionaryOffse
// only update the block if the array changed to prevent creation of new Block objects, since
// the engine currently uses identity equality to test if dictionaries are the same
if (currentDictionaryData != dictionaryData) {
boolean[] isNullVector = new boolean[positionCount];
isNullVector[positionCount - 1] = true;
dictionaryOffsets[positionCount] = dictionaryOffsets[positionCount - 1];
dictionary = new VariableWidthBlock(positionCount, wrappedBuffer(dictionaryData), dictionaryOffsets, Optional.of(isNullVector));
currentDictionaryData = dictionaryData;
evaluationStatus = ensureCapacity(evaluationStatus, positionCount - 1);
fill(evaluationStatus, 0, evaluationStatus.length, FILTER_NOT_EVALUATED);
}
currentDictionaryOffsetVector = dictionaryOffsets;
currentDictionarySize = positionCount;
// The last element in the dictionary is null.
currentDictionaryOffsetVector[currentDictionarySize] = currentDictionaryOffsetVector[currentDictionarySize - 1];

evaluationStatus = ensureCapacity(evaluationStatus, positionCount - 1);
fill(evaluationStatus, 0, evaluationStatus.length, FILTER_NOT_EVALUATED);
}

private BlockLease newLease(Block block)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ public void testVarBinaries()
public void testMemoryTracking()
throws Exception
{
List<Type> types = ImmutableList.of(INTEGER, VARCHAR);
List<Type> types = ImmutableList.of(INTEGER, VARCHAR, VARCHAR);
TempFile tempFile = new TempFile();
List<Integer> intValues = newArrayList(limit(
cycle(concat(
Expand All @@ -916,11 +916,13 @@ public void testMemoryTracking()
ImmutableList.of(3), nCopies(9999, 123),
nCopies(1_000_000, null))),
NUM_ROWS));
List<String> varcharValues = newArrayList(limit(cycle(ImmutableList.of("A", "B", "C")), NUM_ROWS));
List<String> varcharDirectValues = newArrayList(limit(cycle(ImmutableList.of("A", "B", "C")), NUM_ROWS));
List<String> varcharDictionaryValues = newArrayList(limit(cycle(ImmutableList.of("apple", "apple pie", "apple\uD835\uDC03", "apple\uFFFD")), NUM_ROWS));
List<List<?>> values = ImmutableList.of(intValues, varcharDirectValues, varcharDictionaryValues);

writeOrcColumnsPresto(tempFile.getFile(), DWRF, CompressionKind.NONE, Optional.empty(), types, ImmutableList.of(intValues, varcharValues), new OrcWriterStats());
writeOrcColumnsPresto(tempFile.getFile(), DWRF, CompressionKind.NONE, Optional.empty(), types, values, new OrcWriterStats());

OrcPredicate orcPredicate = createOrcPredicate(types, ImmutableList.of(intValues, varcharValues), DWRF, false);
OrcPredicate orcPredicate = createOrcPredicate(types, values, DWRF, false);
Map<Integer, Type> includedColumns = IntStream.range(0, types.size())
.boxed()
.collect(toImmutableMap(Function.identity(), types::get));
Expand Down Expand Up @@ -960,7 +962,7 @@ public void testMemoryTracking()

page.getLoadedPage();

assertBetweenInclusive(systemMemoryUsage.getBytes(), 110000L, 130000L);
assertBetweenInclusive(systemMemoryUsage.getBytes(), 150000L, 160000L);

rowsProcessed += positionCount;
}
Expand Down

0 comments on commit 7994754

Please sign in to comment.