Skip to content

Commit

Permalink
Reuse serializedRowSizes buffer between different PartitionBuffers
Browse files Browse the repository at this point in the history
  • Loading branch information
Ying Su authored and mbasmanova committed Mar 23, 2020
1 parent 1bbd131 commit d3f9cfc
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 29 deletions.
28 changes: 28 additions & 0 deletions presto-array/src/main/java/com/facebook/presto/array/Arrays.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.facebook.presto.array;

import com.facebook.presto.spi.block.ArrayAllocator;

import static com.facebook.presto.array.Arrays.ExpansionFactor.SMALL;
import static com.facebook.presto.array.Arrays.ExpansionOption.INITIALIZE;
import static com.facebook.presto.array.Arrays.ExpansionOption.NONE;
Expand Down Expand Up @@ -49,6 +51,32 @@ else if (expansionOption == INITIALIZE) {
return buffer;
}

public static int[] ensureCapacity(int[] buffer, int capacity, ExpansionFactor expansionFactor, ExpansionOption expansionOption, ArrayAllocator allocator)
{
int newCapacity = (int) (capacity * expansionFactor.expansionFactor);

int[] newBuffer;
if (buffer == null) {
newBuffer = allocator.borrowIntArray(newCapacity);
}
else if (buffer.length < capacity) {
newBuffer = allocator.borrowIntArray(newCapacity);
if (expansionOption == PRESERVE) {
System.arraycopy(buffer, 0, newBuffer, 0, buffer.length);
}
allocator.returnArray(buffer);
}
else {
newBuffer = buffer;
}

if (expansionOption == INITIALIZE) {
java.util.Arrays.fill(newBuffer, 0);
}

return newBuffer;
}

public static long[] ensureCapacity(long[] buffer, int capacity)
{
if (buffer == null || buffer.length < capacity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,15 @@ private static class PagePartitioner
private final AtomicLong rowsAdded = new AtomicLong();
private final AtomicLong pagesAdded = new AtomicLong();

// The ArrayAllocator used by BlockFlattener for decoding blocks.
// There could be queries that shuffles data with up to 1000 columns so we need to set the maxOutstandingArrays a high number.
private final ArrayAllocator arrayAllocator = new SimpleArrayAllocator(5000);
private final BlockFlattener flattener = new BlockFlattener(arrayAllocator);
private final ArrayAllocator blockDecodingAllocator = new SimpleArrayAllocator(5_000);
private final BlockFlattener flattener = new BlockFlattener(blockDecodingAllocator);
private final Closer blockLeaseCloser = Closer.create();

// The ArrayAllocator for the buffers used in repartitioning, e.g. PartitionBuffer#serializedRowSizes, BlockEncodingBuffer#mappedPositions.
private final ArrayAllocator bufferAllocator = new SimpleArrayAllocator(10_000);

private final PartitionBuffer[] partitionBuffers;
private final List<Type> sourceTypes;
private final List<Integer> variableWidthChannels;
Expand Down Expand Up @@ -401,7 +405,7 @@ public PagePartitioner(

partitionBuffers = new PartitionBuffer[partitionCount];
for (int i = 0; i < partitionCount; i++) {
partitionBuffers[i] = new PartitionBuffer(i, sourceTypes.size(), pageSize, pagesAdded, rowsAdded, serde, lifespan);
partitionBuffers[i] = new PartitionBuffer(i, sourceTypes.size(), pageSize, pagesAdded, rowsAdded, serde, lifespan, bufferAllocator);
}

this.sourceTypes = sourceTypes;
Expand Down Expand Up @@ -490,11 +494,13 @@ public void flush()

public long getRetainedSizeInBytes()
{
// When called by the operator constructor, the arrayAllocator was empty at the moment.
// When called in addInput(), the arrays have been returned to the arrayAllocator already,
// When called by the operator constructor, the blockDecodingAllocator was empty at the moment.
// When called in addInput(), the arrays have been returned to the blockDecodingAllocator already,
// but they're still owned by the decodedBlock which will be counted as part of the decodedBlock.
// In both cases, the arrayAllocator doesn't need to be counted.
long size = 0;
// In both cases, the blockDecodingAllocator doesn't need to be counted. But we need to count
// bufferAllocator which contains buffers used during partitioning, e.g. serializedRowSizes,
// mappedPositions, etc.
long size = bufferAllocator.getEstimatedSizeInBytes();

for (int i = 0; i < partitionBuffers.length; i++) {
size += partitionBuffers[i].getRetainedSizeInBytes();
Expand Down Expand Up @@ -544,16 +550,16 @@ private static class PartitionBuffer
private final Lifespan lifespan;
private final int capacity;
private final int channelCount;
private final ArrayAllocator bufferAllocator;

private int[] positions; // the default positions array for top level BlockEncodingBuffer
private int[] serializedRowSizes; // The sizes of the rows in bytes if they were serialized
private int positionCount; // number of positions to be copied for this partition
private BlockEncodingBuffer[] blockEncodingBuffers;

private int bufferedRowCount;
private boolean bufferFull;

PartitionBuffer(int partition, int channelCount, int capacity, AtomicLong pagesAdded, AtomicLong rowsAdded, PagesSerde serde, Lifespan lifespan)
PartitionBuffer(int partition, int channelCount, int capacity, AtomicLong pagesAdded, AtomicLong rowsAdded, PagesSerde serde, Lifespan lifespan, ArrayAllocator bufferAllocator)
{
this.partition = partition;
this.channelCount = channelCount;
Expand All @@ -562,6 +568,7 @@ private static class PartitionBuffer
this.rowsAdded = requireNonNull(rowsAdded, "rowsAdded is null");
this.serde = requireNonNull(serde, "serde is null");
this.lifespan = requireNonNull(lifespan, "lifespan is null");
this.bufferAllocator = requireNonNull(bufferAllocator, "bufferAllocator is null");
}

private void resetPositions(int estimatedPositionCount)
Expand Down Expand Up @@ -597,27 +604,34 @@ private void appendData(DecodedBlockNode[] decodedBlocks, int fixedWidthRowSize,
blockEncodingBuffers[i].setupDecodedBlocksAndPositions(decodedBlocks[i], positions, positionCount);
}

populateSerializedRowSizes(fixedWidthRowSize, variableWidthChannels);
int[] serializedRowSizes = ensureCapacity(null, positionCount, SMALL, INITIALIZE, bufferAllocator);
try {
populateSerializedRowSizes(fixedWidthRowSize, variableWidthChannels, serializedRowSizes);

// Due to the limitation of buffer size, we append the data batch by batch
int offset = 0;
do {
int batchSize = calculateNextBatchSize(fixedWidthRowSize, variableWidthChannels, offset);
// Due to the limitation of buffer size, we append the data batch by batch
int offset = 0;
do {
int batchSize = calculateNextBatchSize(fixedWidthRowSize, variableWidthChannels, offset, serializedRowSizes);

for (int i = 0; i < channelCount; i++) {
blockEncodingBuffers[i].setNextBatch(offset, batchSize);
blockEncodingBuffers[i].appendDataInBatch();
}
for (int i = 0; i < channelCount; i++) {
blockEncodingBuffers[i].setNextBatch(offset, batchSize);
blockEncodingBuffers[i].appendDataInBatch();
}

bufferedRowCount += batchSize;
offset += batchSize;
bufferedRowCount += batchSize;
offset += batchSize;

if (bufferFull) {
flush(outputBuffer);
bufferFull = false;
if (bufferFull) {
flush(outputBuffer);
bufferFull = false;
}
}
while (offset < positionCount);
}
finally {
// Return the borrowed array for serializedRowSizes when the current page for the current partition is finished.
bufferAllocator.returnArray(serializedRowSizes);
}
while (offset < positionCount);
}

private void initializeBlockEncodingBuffers(DecodedBlockNode[] decodedBlocks)
Expand All @@ -635,14 +649,12 @@ private void initializeBlockEncodingBuffers(DecodedBlockNode[] decodedBlocks)
/**
* Calculate the row sizes in bytes and write them to serializedRowSizes.
*/
private void populateSerializedRowSizes(int fixedWidthRowSize, List<Integer> variableWidthChannels)
private void populateSerializedRowSizes(int fixedWidthRowSize, List<Integer> variableWidthChannels, int[] serializedRowSizes)
{
if (variableWidthChannels.isEmpty()) {
return;
}

serializedRowSizes = ensureCapacity(serializedRowSizes, positionCount, SMALL, INITIALIZE);

for (int i : variableWidthChannels) {
blockEncodingBuffers[i].accumulateSerializedRowSizes(serializedRowSizes);
}
Expand All @@ -652,7 +664,7 @@ private void populateSerializedRowSizes(int fixedWidthRowSize, List<Integer> var
}
}

private int calculateNextBatchSize(int fixedWidthRowSize, List<Integer> variableWidthChannels, int startPosition)
private int calculateNextBatchSize(int fixedWidthRowSize, List<Integer> variableWidthChannels, int startPosition, int[] serializedRowSizes)
{
int bytesRemaining = capacity - getSerializedBuffersSizeInBytes();

Expand Down Expand Up @@ -702,7 +714,7 @@ private void flush(OutputBuffer outputBuffer)

private long getRetainedSizeInBytes()
{
long size = INSTANCE_SIZE + sizeOf(positions) + sizeOf(serializedRowSizes);
long size = INSTANCE_SIZE + sizeOf(positions);

// Some destination partitions might get 0 rows. In that case the BlockEncodingBuffer won't be created.
if (blockEncodingBuffers != null) {
Expand Down

0 comments on commit d3f9cfc

Please sign in to comment.