Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ private ExecConstants() {
public static final String SPILL_FILESYSTEM = "drill.exec.spill.fs";
public static final String SPILL_DIRS = "drill.exec.spill.directories";

public static final String OUTPUT_BATCH_SIZE = "drill.exec.memory.operator.output_batch_size";
public static final LongValidator OUTPUT_BATCH_SIZE_VALIDATOR = new RangeLongValidator(OUTPUT_BATCH_SIZE, 1024, 512 * 1024 * 1024);

// External Sort Boot configuration

public static final String EXTERNAL_SORT_TARGET_SPILL_BATCH_SIZE = "drill.exec.sort.external.spill.batch.size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
Expand All @@ -40,6 +41,7 @@
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.FlattenPOP;
import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
import org.apache.drill.exec.record.AbstractSingleRecordBatch;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.MaterializedField;
Expand Down Expand Up @@ -68,6 +70,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
private boolean hasRemainder = false;
private int remainderIndex = 0;
private int recordCount;
private long outputBatchSize;

private final Flattener.Monitor monitor = new Flattener.Monitor() {
@Override
Expand All @@ -94,8 +97,57 @@ private void clear() {
}
}

private class FlattenMemoryManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The memory manager is an excellent idea. But, it is private. This means we are passing up one of the key benefits of this kind of manager: the ability to unit test it separately without needing actual memory or batches. (See the external sort for an example.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory manager actually requires incoming batch. All the tests in testOutputBatch are exercising this code. I went down the path of writing tests just for memory manager. But, they are redundant and doing the same thing I am doing with the other tests. Also, did not see much point in validating output row count, since we are interested in batch size more than row count. Please let me know if you feel otherwise and I can include those tests as well.

private final int outputRowCount;
private static final int OFFSET_VECTOR_WIDTH = 4;
private static final int WORST_CASE_FRAGMENTATION_FACTOR = 2;
private static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT;
private static final int MIN_NUM_ROWS = 1;

private FlattenMemoryManager(RecordBatch incoming, long outputBatchSize, SchemaPath flattenColumn) {
// Get sizing information for the batch.
RecordBatchSizer sizer = new RecordBatchSizer(incoming);

final TypedFieldId typedFieldId = incoming.getValueVectorId(flattenColumn);
final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);

// Get column size of flatten column.
RecordBatchSizer.ColumnSize columnSize = RecordBatchSizer.getColumn(incoming.getValueAccessorById(field.getValueClass(),
typedFieldId.getFieldIds()).getValueVector(), field.getName());

// Average rowWidth of flatten column
final int avgRowWidthFlattenColumn = RecordBatchSizer.safeDivide(columnSize.netSize, incoming.getRecordCount());

// Average rowWidth excluding the flatten column.
final int avgRowWidthWithOutFlattenColumn = sizer.netRowWidth() - avgRowWidthFlattenColumn;

// Average rowWidth of single element in the flatten list.
// subtract the offset vector size from column data size.
final int avgRowWidthSingleFlattenEntry =
RecordBatchSizer.safeDivide(columnSize.netSize - (OFFSET_VECTOR_WIDTH * columnSize.valueCount), columnSize.elementCount);

// Average rowWidth of outgoing batch.
final int avgOutgoingRowWidth = avgRowWidthWithOutFlattenColumn + avgRowWidthSingleFlattenEntry;

// Number of rows in outgoing batch
outputRowCount = Math.max(MIN_NUM_ROWS, Math.min(MAX_NUM_ROWS,
RecordBatchSizer.safeDivide((outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR), avgOutgoingRowWidth)));

logger.debug("flatten incoming batch sizer : {}, outputBatchSize : {}," +
"avgOutgoingRowWidth : {}, outputRowCount : {}", sizer, outputBatchSize, avgOutgoingRowWidth, outputRowCount);
}

public int getOutputRowCount() {
return outputRowCount;
}
}


public FlattenRecordBatch(FlattenPOP pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
super(pop, context, incoming);

// get the output batch size from config.
outputBatchSize = context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
}

@Override
Expand Down Expand Up @@ -148,6 +200,9 @@ private void setFlattenVector() {

@Override
protected IterOutcome doWork() {
FlattenMemoryManager flattenMemoryManager = new FlattenMemoryManager(incoming, outputBatchSize, popConfig.getColumn());
flattener.setOutputCount(flattenMemoryManager.getOutputRowCount());

int incomingRecordCount = incoming.getRecordCount();

if (!doAlloc()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,28 @@

import com.google.common.collect.ImmutableList;

import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.RepeatedValueVector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class FlattenTemplate implements Flattener {
private static final Logger logger = LoggerFactory.getLogger(FlattenTemplate.class);

private static final int OUTPUT_BATCH_SIZE = 4*1024;
private static final int OUTPUT_MEMORY_LIMIT = 512 * 1024 * 1024;
private static final int OUTPUT_ROW_COUNT = ValueVector.MAX_ROW_COUNT;

private ImmutableList<TransferPair> transfers;
private BufferAllocator outputAllocator;
private SelectionVectorMode svMode;
private RepeatedValueVector fieldToFlatten;
private RepeatedValueVector.RepeatedAccessor accessor;
private int valueIndex;
private boolean bigRecords = false;
private int bigRecordsBufferSize;

/**
* The output batch limit starts at OUTPUT_BATCH_SIZE, but may be decreased
* The output batch limit starts at OUTPUT_ROW_COUNT, but may be decreased
* if records are found to be large.
*/
private int outputLimit = OUTPUT_BATCH_SIZE;
private int outputLimit = OUTPUT_ROW_COUNT;

// this allows for groups to be written between batches if we run out of space, for cases where we have finished
// a batch on the boundary it will be set to 0
Expand All @@ -72,6 +70,11 @@ public RepeatedValueVector getFlattenField() {
return fieldToFlatten;
}

@Override
public void setOutputCount(int outputCount) {
outputLimit = outputCount;
}

@Override
public final int flattenRecords(final int recordCount, final int firstOutputIndex,
final Flattener.Monitor monitor) {
Expand Down Expand Up @@ -101,75 +104,10 @@ public final int flattenRecords(final int recordCount, final int firstOutputInde
for ( ; innerValueIndexLocal < innerValueCount; innerValueIndexLocal++) {
// If we've hit the batch size limit, stop and flush what we've got so far.
if (recordsThisCall == outputLimit) {
if (bigRecords) {
/*
* We got to the limit we used before, but did we go over
* the bigRecordsBufferSize in the second half of the batch? If
* so, we'll need to adjust the batch limits.
*/
adjustBatchLimits(1, monitor, recordsThisCall);
}

// Flush this batch.
break outer;
}

/*
* At the moment, the output record includes the input record, so for very
* large records that we're flattening, we're carrying forward the original
* record as well as the flattened element. We've seen a case where flattening a 4MB
* record with a 20,000 element array causing memory usage to explode. To avoid
* that until we can push down the selected fields to operators like this, we
* also limit the amount of memory in use at one time.
*
* We have to have written at least one record to be able to get a buffer that will
* have a real allocator, so we have to do this lazily. We won't check the limit
* for the first two records, but that keeps this simple.
*/
if (bigRecords) {
/*
* If we're halfway through the outputLimit, check on our memory
* usage so far.
*/
if (recordsThisCall == outputLimit / 2) {
/*
* If we've used more than half the space we've used for big records
* in the past, we've seen even bigger records than before, so stop and
* see if we need to flush here before we go over bigRecordsBufferSize
* memory usage, and reduce the outputLimit further before we continue
* with the next batch.
*/
if (adjustBatchLimits(2, monitor, recordsThisCall)) {
break outer;
}
}
} else {
if (outputAllocator.getAllocatedMemory() > OUTPUT_MEMORY_LIMIT) {
/*
* We're dealing with big records. Reduce the outputLimit to
* the current record count, and take note of how much space the
* vectors report using for that. We'll use those numbers as limits
* going forward in order to avoid allocating more memory.
*/
bigRecords = true;
outputLimit = Math.min(recordsThisCall, outputLimit);
if (outputLimit < 1) {
throw new IllegalStateException("flatten outputLimit (" + outputLimit
+ ") won't make progress");
}

/*
* This will differ from what the allocator reports because of
* overhead. But the allocator check is much cheaper to do, so we
* only compute this at selected times.
*/
bigRecordsBufferSize = monitor.getBufferSizeFor(recordsThisCall);

// Stop and flush.
break outer;
}
}

try {
doEval(valueIndexLocal, outputIndex);
} catch (OversizedAllocationException ex) {
Expand Down Expand Up @@ -211,68 +149,6 @@ public final int flattenRecords(final int recordCount, final int firstOutputInde
}
}

/**
* Determine if the current batch record limit needs to be adjusted (when handling
* bigRecord mode). If so, adjust the limit, and return true, otherwise return false.
*
* <p>If the limit is adjusted, it will always be adjusted down, because we need to operate
* based on the largest sized record we've ever seen.</p>
*
* <p>If the limit is adjusted, then the current batch should be flushed, because
* continuing would lead to going over the large memory limit that has already been
* established.</p>
*
* @param multiplier Multiply currently used memory (according to the monitor) before
* checking against past memory limits. This allows for checking the currently used
* memory after processing a fraction of the expected batch limit, but using that as
* a predictor of the full batch's size. For example, if this is checked after half
* the batch size limit's records are processed, then using a multiplier of two will
* do the check under the assumption that processing the full batch limit will use
* twice as much memory.
* @param monitor the Flattener.Monitor instance to use for the current memory usage check
* @param recordsThisCall the number of records processed so far during this call to
* flattenRecords().
* @return true if the batch size limit was adjusted, false otherwise
*/
private boolean adjustBatchLimits(final int multiplier, final Flattener.Monitor monitor,
final int recordsThisCall) {
assert bigRecords : "adjusting batch limits when no big records";
final int bufferSize = multiplier * monitor.getBufferSizeFor(recordsThisCall);

/*
* If the amount of space we've used so far is below the amount that triggered
* the bigRecords mode, then no adjustment is needed.
*/
if (bufferSize <= bigRecordsBufferSize) {
return false;
}

/*
* We've used more space than we've used for big records in the past, we've seen
* even bigger records, so we need to adjust our limits, and flush what we've got so far.
*
* We should reduce the outputLimit proportionately to get the predicted
* amount of memory used back down to bigRecordsBufferSize.
*
* The number of records to limit is therefore
* outputLimit *
* (1 - (bufferSize - bigRecordsBufferSize) / bigRecordsBufferSize)
*
* Doing some algebra on the multiplier:
* (bigRecordsBufferSize - (bufferSize - bigRecordsBufferSize)) / bigRecordsBufferSize
* (bigRecordsBufferSize - bufferSize + bigRecordsBufferSize) / bigRecordsBufferSize
* (2 * bigRecordsBufferSize - bufferSize) / bigRecordsBufferSize
*
* If bufferSize has gotten so big that this would be negative, we'll
* just go down to one record per batch. We need to check for that on
* outputLimit anyway, in order to make sure that we make progress.
*/
final int newLimit = (int)
(outputLimit * (2.0 * ((double) bigRecordsBufferSize) - bufferSize) / bigRecordsBufferSize);
outputLimit = Math.max(1, newLimit);
return true;
}

@Override
public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException{

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ interface Monitor {
int flattenRecords(int recordCount, int firstOutputIndex, Monitor monitor);

void setFlattenField(RepeatedValueVector repeatedColumn);
void setOutputCount(int outputCount);

RepeatedValueVector getFlattenField();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,18 @@ public static class ColumnSize {

public final int elementCount;

/**
* Size of the top level value vector. For map and repeated list,
* this is just size of offset vector.
*/
public int dataSize;

/**
* Total size of the column includes the sum total of memory for all
* value vectors representing the column.
*/
public int netSize;

/**
* The estimated, average number of elements per parent value.
* Always 1 for a non-repeated type. For a repeated type,
Expand Down Expand Up @@ -131,9 +141,15 @@ public ColumnSize(ValueVector v, String prefix) {
break;
default:
dataSize = v.getPayloadByteCount(valueCount);
stdSize = TypeHelper.getSize(metadata.getType()) * elementCount;
try {
stdSize = TypeHelper.getSize(metadata.getType()) * elementCount;
} catch (Exception e) {
// For unsupported types, just set stdSize to 0.
stdSize = 0;
}
}
estSize = safeDivide(dataSize, valueCount);
netSize = v.getPayloadByteCount(valueCount);
}

@SuppressWarnings("resource")
Expand All @@ -154,8 +170,14 @@ private int buildRepeated(ValueVector v) {
return childCount;
}

@SuppressWarnings("resource")
private void buildList(ValueVector v) {
@SuppressWarnings("resource")
// complex ListVector cannot be casted to RepeatedListVector.
// check the mode.
if (v.getField().getDataMode() != DataMode.REPEATED) {
dataSize = v.getPayloadByteCount(valueCount);
return;
}
UInt4Vector offsetVector = ((RepeatedListVector) v).getOffsetVector();
dataSize = offsetVector.getPayloadByteCount(valueCount);
}
Expand Down Expand Up @@ -232,6 +254,10 @@ else if (width > 0) {
}
}

public static ColumnSize getColumn(ValueVector v, String prefix) {
return new ColumnSize(v, prefix);
}

public static final int MAX_VECTOR_SIZE = ValueVector.MAX_BUFFER_SIZE; // 16 MiB

private List<ColumnSize> columnSizes = new ArrayList<>();
Expand Down Expand Up @@ -380,14 +406,18 @@ private void measureColumn(ValueVector v, String prefix) {
// vectors do consume space, so visit columns recursively.

switch (v.getField().getType().getMinorType()) {
case MAP:
expandMap((AbstractMapVector) v, prefix + v.getField().getName() + ".");
break;
case LIST:
expandList((RepeatedListVector) v, prefix + v.getField().getName() + ".");
break;
default:
v.collectLedgers(ledgers);
case MAP:
expandMap((AbstractMapVector) v, prefix + v.getField().getName() + ".");
break;
case LIST:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, check mode. If mode is REPEATED, this is a RepeatedListVector and should go down one path. Otherwise, it is a ListVector (possible list of unions) and should go down another path.

// complex ListVector cannot be casted to RepeatedListVector.
// do not expand the list if it is not repeated mode.
if (v.getField().getDataMode() == DataMode.REPEATED) {
expandList((RepeatedListVector) v, prefix + v.getField().getName() + ".");
}
break;
default:
v.collectLedgers(ledgers);
}

netRowWidth += colSize.estSize;
Expand Down
Loading