Skip to content

Commit

Permalink
Introduce large dictionary mode in SliceDictionarySelectiveReader
Browse files Browse the repository at this point in the history
Previously we always allocate a dictionary for every rowgroup. When
these dictionaries are humongous, the allocations could cause reliability
and performance issues. This commit materializes the dictionaries if
they are too large. Instead of outputting a DictionaryBlock, it will
output a plain VariableWidthBlock if the dictionaries size is above
certain threshold. The experiment on user reported query shows over 10x
reduction in allocations and over 2x CPU reduction in scan.
  • Loading branch information
Ying Su authored and mbasmanova committed Sep 15, 2020
1 parent 0006741 commit 219c7d7
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.openjdk.jol.info.ClassLayout;

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -80,6 +79,11 @@ public class SliceDictionarySelectiveReader
// add one extra entry for null after stripe/rowGroup dictionary
private static final int[] EMPTY_DICTIONARY_OFFSETS = new int[2];

// Each rowgroup has roughly 10K rows, and each batch reads 1K rows. So there're about 10 batches in a rowgroup.
private static final int BATCHES_PER_ROWGROUP = 10;
// MATERIALIZATION_RATIO should be greater than or equal to 1.0f to compensate the extra CPU to materialize blocks.
private static final float MATERIALIZATION_RATIO = 2.0f;

private final TupleDomainFilter filter;
private final boolean nonDeterministicFilter;
private final boolean nullsAllowed;
Expand Down Expand Up @@ -127,6 +131,7 @@ public class SliceDictionarySelectiveReader
private OrcLocalMemoryContext systemMemoryContext;

private int[] values;
private int nullsCount;
private boolean allNulls;
private int[] outputPositions;
private int outputPositionCount;
Expand Down Expand Up @@ -158,6 +163,7 @@ public int read(int offset, int[] positions, int positionCount)
openRowGroup();
}

nullsCount = 0;
allNulls = false;

if (outputRequired) {
Expand Down Expand Up @@ -203,6 +209,7 @@ private int readNoFilter(int[] positions, int positionCount)

if (presentStream != null && !presentStream.nextBit()) {
values[i] = currentDictionarySize - 1;
nullsCount++;
}
else {
boolean isInRowDictionary = inDictionaryStream != null && !inDictionaryStream.nextBit();
Expand Down Expand Up @@ -230,6 +237,7 @@ private int readWithFilter(int[] positions, int positionCount)
if ((nonDeterministicFilter && filter.testNull()) || nullsAllowed) {
if (outputRequired) {
values[outputPositionCount] = currentDictionarySize - 1;
nullsCount++;
}
outputPositions[outputPositionCount] = position;
outputPositionCount++;
Expand Down Expand Up @@ -380,36 +388,45 @@ public Block getBlock(int[] positions, int positionCount)
checkState(positionCount <= outputPositionCount, "Not enough values");
checkState(!valuesInUse, "BlockLease hasn't been closed yet");

if (allNulls) {
if (allNulls || nullsCount == outputPositionCount) {
return new RunLengthEncodedBlock(outputType.createBlockBuilder(null, 1).appendNull().build(), positionCount);
}

wrapDictionaryIfNecessary();
// compact values(ids) array, and calculate 1) the slice sizeInBytes if materialized, and 2) number of nulls
long blockSizeInBytes = 0;
int nullsCount = 0; // the nulls count for selected positions
int i = 0;
int j = 0;
while (i < positionCount && j < outputPositionCount) {
if (positions[i] != outputPositions[j]) {
j++;
continue;
}

int id = this.values[j];
values[i] = id;

if (positionCount == outputPositionCount) {
DictionaryBlock block = new DictionaryBlock(positionCount, dictionary, values);
blockSizeInBytes += dictionaryOffsetVector[id + 1] - dictionaryOffsetVector[id];
nullsCount += (id == currentDictionarySize - 1 ? 1 : 0);

values = null;
return block;
i++;
j++;
}

int[] valuesCopy = new int[positionCount];
// If all selected positions are null, just return RLE block.
if (nullsCount == outputPositionCount) {
return new RunLengthEncodedBlock(outputType.createBlockBuilder(null, 1).appendNull().build(), positionCount);
}

int positionIndex = 0;
int nextPosition = positions[positionIndex];
for (int i = 0; i < outputPositionCount; i++) {
if (outputPositions[i] < nextPosition) {
continue;
}
assert outputPositions[i] == nextPosition;
valuesCopy[positionIndex] = this.values[i];
positionIndex++;
if (positionIndex >= positionCount) {
break;
}
nextPosition = positions[positionIndex];
// If the expected materialized size of the output block is smaller than a certain ratio of the dictionary size, we will materialize the values
int dictionarySizeInBytes = dictionaryOffsetVector[currentDictionarySize - 1];
if (blockSizeInBytes * BATCHES_PER_ROWGROUP < dictionarySizeInBytes / MATERIALIZATION_RATIO) {
return getMaterializedBlock(positionCount, blockSizeInBytes, nullsCount);
}

wrapDictionaryIfNecessary();

int[] valuesCopy = Arrays.copyOf(values, positionCount);
return new DictionaryBlock(positionCount, dictionary, valuesCopy);
}

Expand Down Expand Up @@ -691,4 +708,33 @@ private BlockLease newLease(Block block)
valuesInUse = true;
return ClosingBlockLease.newLease(block, () -> valuesInUse = false);
}

private Block getMaterializedBlock(int positionCount, long blockSizeInBytes, int nullsCount)
{
byte[] sliceData = new byte[toIntExact(blockSizeInBytes)];
int[] offsetVector = new int[positionCount + 1];
int currentOffset = 0;
for (int k = 0; k < positionCount; k++) {
int id = values[k];
int offset = dictionaryOffsetVector[id];
int length = dictionaryOffsetVector[id + 1] - offset;
System.arraycopy(dictionaryData, offset, sliceData, currentOffset, length);

currentOffset += length;
offsetVector[k + 1] = currentOffset;
}

if (nullsCount > 0) {
boolean[] isNullVector = new boolean[positionCount];
for (int k = 0; k < positionCount; k++) {
if (values[k] == currentDictionarySize - 1) {
isNullVector[k] = true;
}
}
return new VariableWidthBlock(positionCount, wrappedBuffer(sliceData), offsetVector, Optional.of(isNullVector));
}
else {
return new VariableWidthBlock(positionCount, wrappedBuffer(sliceData), offsetVector, Optional.empty());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -763,11 +763,11 @@ public void testVarchars()
1, stringIn(true, "10", "11"),
2, stringIn(true, "def", "abc"))));

// dictionary
// direct and dictionary
tester.testRoundTrip(VARCHAR, newArrayList(limit(cycle(ImmutableList.of("apple", "apple pie", "apple\uD835\uDC03", "apple\uFFFD")), NUM_ROWS)),
stringIn(false, "apple", "apple pie"));

// direct
// direct and dictionary materialized
tester.testRoundTrip(VARCHAR,
intsBetween(0, NUM_ROWS).stream().map(Object::toString).collect(toList()),
stringIn(false, "10", "11"),
Expand Down Expand Up @@ -812,7 +812,7 @@ public void testVarchars()
.map(Object::toString)
.collect(toList()));

// presentStream is null in some row groups
// presentStream is null in some row groups & dictionary materialized
Function<Integer, String> randomStrings = i -> String.valueOf(random.nextInt(NUM_ROWS));
tester.testRoundTripTypes(
ImmutableList.of(INTEGER, VARCHAR),
Expand Down

0 comments on commit 219c7d7

Please sign in to comment.