Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,16 @@ public class UserException extends DrillRuntimeException {
*/
public static Builder memoryError(final Throwable cause) {
return UserException.resourceError(cause)
.message(MEMORY_ERROR_MSG);
.message(MEMORY_ERROR_MSG).addContext(cause.getMessage());
}

public static Builder memoryError(final String format, final Object... args) {
Builder builder = UserException.resourceError();
builder.message(MEMORY_ERROR_MSG);
if (!format.isEmpty()) {
builder.addContext(String.format(format, args));
}
return builder;
}

/**
Expand All @@ -59,7 +68,7 @@ public static Builder memoryError(final Throwable cause) {
* @return resource error builder
*/
public static Builder memoryError() {
return memoryError(null);
return memoryError("");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.physical.impl.xsort;

import io.netty.buffer.DrillBuf;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
Expand Down Expand Up @@ -95,7 +93,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private final int SPILL_THRESHOLD;
private final Iterator<String> dirs;
private final RecordBatch incoming;
private BufferAllocator copierAllocator;
private final BufferAllocator oAllocator;
private final BufferAllocator copierAllocator;

private BatchSchema schema;
private SingleBatchSorter sorter;
Expand All @@ -114,12 +113,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private int spillCount = 0;
private int batchesSinceLastSpill = 0;
private boolean first = true;
private long totalSizeInMemory = 0;
private long highWaterMark = Long.MAX_VALUE;
private int targetRecordCount;
private final String fileName;
private int firstSpillBatchCount = 0;
private long peakSizeInMemory = -1;
private int peakNumBatches = -1;

/**
Expand Down Expand Up @@ -157,7 +153,8 @@ public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, Record
SPILL_BATCH_GROUP_SIZE = config.getInt(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE);
SPILL_THRESHOLD = config.getInt(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD);
dirs = Iterators.cycle(config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS));
copierAllocator = oContext.getAllocator().newChildAllocator(oContext.getAllocator().getName() + ":copier",
oAllocator = oContext.getAllocator();
copierAllocator = oAllocator.newChildAllocator(oAllocator.getName() + ":copier",
PriorityQueueCopier.INITIAL_ALLOCATION, PriorityQueueCopier.MAX_ALLOCATION);
FragmentHandle handle = context.getHandle();
fileName = String.format("%s/major_fragment_%s/minor_fragment_%s/operator_%s", QueryIdHelper.getQueryId(handle.getQueryId()),
Expand Down Expand Up @@ -356,17 +353,12 @@ public IterOutcome innerNext() {
throw new OutOfMemoryException(e);
}
}
totalSizeInMemory += getBufferSize(convertedBatch);
if (peakSizeInMemory < totalSizeInMemory) {
peakSizeInMemory = totalSizeInMemory;
stats.setLongStat(Metric.PEAK_SIZE_IN_MEMORY, peakSizeInMemory);
}

int count = sv2.getCount();
totalCount += count;
sorter.setup(context, sv2, convertedBatch);
sorter.sort(sv2);
RecordBatchData rbd = new RecordBatchData(convertedBatch, oContext.getAllocator());
RecordBatchData rbd = new RecordBatchData(convertedBatch, oAllocator);
boolean success = false;
try {
rbd.setSv2(sv2);
Expand All @@ -377,16 +369,12 @@ public IterOutcome innerNext() {
}

batchesSinceLastSpill++;
if (// We have spilled at least once and the current memory used is more than the 75% of peak memory used.
(spillCount > 0 && totalSizeInMemory > .75 * highWaterMark) ||
// If we haven't spilled so far, do we have enough memory for MSorter if this turns out to be the last incoming batch?
if (// If we haven't spilled so far, do we have enough memory for MSorter if this turns out to be the last incoming batch?
(spillCount == 0 && !hasMemoryForInMemorySort(totalCount)) ||
// If we haven't spilled so far, make sure we don't exceed the maximum number of batches SV4 can address
(spillCount == 0 && totalBatches > Character.MAX_VALUE) ||
// current memory used is more than 95% of memory usage limit of this operator
(totalSizeInMemory > .95 * popConfig.getMaxAllocation()) ||
// current memory used is more than 95% of memory usage limit of this fragment
(totalSizeInMemory > .95 * oContext.getAllocator().getLimit()) ||
(oAllocator.getAllocatedMemory() > .95 * oAllocator.getLimit()) ||
// Number of incoming batches (BatchGroups) exceed the limit and number of incoming batches accumulated
// since the last spill exceed the defined limit
(batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) {
Expand Down Expand Up @@ -417,7 +405,6 @@ public IterOutcome innerNext() {
break;
case OUT_OF_MEMORY:
logger.debug("received OUT_OF_MEMORY, trying to spill");
highWaterMark = totalSizeInMemory;
if (batchesSinceLastSpill > 2) {
final BatchGroup merged = mergeAndSpill(batchGroups);
if (merged != null) {
Expand All @@ -443,18 +430,18 @@ public IterOutcome innerNext() {
builder.clear();
builder.close();
}
builder = new SortRecordBatchBuilder(oContext.getAllocator());
builder = new SortRecordBatchBuilder(oAllocator);

for (BatchGroup group : batchGroups) {
RecordBatchData rbd = new RecordBatchData(group.getContainer(), oContext.getAllocator());
RecordBatchData rbd = new RecordBatchData(group.getContainer(), oAllocator);
rbd.setSv2(group.getSv2());
builder.add(rbd);
}

builder.build(context, container);
sv4 = builder.getSv4();
mSorter = createNewMSorter();
mSorter.setup(context, oContext.getAllocator(), getSelectionVector4(), this.container);
mSorter.setup(context, oAllocator, getSelectionVector4(), this.container);

// For testing memory-leak purpose, inject exception after mSorter finishes setup
injector.injectUnchecked(context.getExecutionControls(), INTERRUPTION_AFTER_SETUP);
Expand All @@ -478,7 +465,7 @@ public IterOutcome innerNext() {
batchGroups.addAll(spilledBatchGroups);
spilledBatchGroups = null; // no need to cleanup spilledBatchGroups, all it's batches are in batchGroups now

logger.warn("Starting to merge. {} batch groups. Current allocated memory: {}", batchGroups.size(), oContext.getAllocator().getAllocatedMemory());
logger.warn("Starting to merge. {} batch groups. Current allocated memory: {}", batchGroups.size(), oAllocator.getAllocatedMemory());
VectorContainer hyperBatch = constructHyperBatch(batchGroups);
createCopier(hyperBatch, batchGroups, container, false);

Expand Down Expand Up @@ -513,7 +500,7 @@ public IterOutcome innerNext() {
}

private boolean hasMemoryForInMemorySort(int currentRecordCount) {
long currentlyAvailable = popConfig.getMaxAllocation() - oContext.getAllocator().getAllocatedMemory();
long currentlyAvailable = popConfig.getMaxAllocation() - oAllocator.getAllocatedMemory();

long neededForInMemorySort = SortRecordBatchBuilder.memoryNeeded(currentRecordCount) +
MSortTemplate.memoryNeeded(currentRecordCount);
Expand All @@ -523,7 +510,7 @@ private boolean hasMemoryForInMemorySort(int currentRecordCount) {

public BatchGroup mergeAndSpill(LinkedList<BatchGroup> batchGroups) throws SchemaChangeException {
logger.debug("Copier allocator current allocation {}", copierAllocator.getAllocatedMemory());
logger.debug("mergeAndSpill: starting totalSizeInMemory = {}", totalSizeInMemory);
logger.debug("mergeAndSpill: starting total size in memory = {}", oAllocator.getAllocatedMemory());
VectorContainer outputContainer = new VectorContainer();
List<BatchGroup> batchGroupList = Lists.newArrayList();
int batchCount = batchGroups.size();
Expand All @@ -534,11 +521,7 @@ public BatchGroup mergeAndSpill(LinkedList<BatchGroup> batchGroups) throws Schem
BatchGroup batch = batchGroups.pollLast();
assert batch != null : "Encountered a null batch during merge and spill operation";
batchGroupList.add(batch);
long bufferSize = getBufferSize(batch);
logger.debug("mergeAndSpill: buffer size for batch {} = {}", i, bufferSize);
totalSizeInMemory -= bufferSize;
}
logger.debug("mergeAndSpill: intermediate estimated total size in memory = {}", totalSizeInMemory);

if (batchGroupList.size() == 0) {
return null;
Expand Down Expand Up @@ -588,36 +571,26 @@ public BatchGroup mergeAndSpill(LinkedList<BatchGroup> batchGroups) throws Schem
} finally {
hyperBatch.clear();
}
long bufSize = getBufferSize(c1);
totalSizeInMemory += bufSize;
logger.debug("mergeAndSpill: final total size in memory = {}", totalSizeInMemory);
logger.debug("mergeAndSpill: final total size in memory = {}", oAllocator.getAllocatedMemory());
logger.info("Completed spilling to {}", outputFile);
return newGroup;
}

private long getBufferSize(VectorAccessible batch) {
long size = 0;
for (VectorWrapper<?> w : batch) {
DrillBuf[] bufs = w.getValueVector().getBuffers(false);
for (DrillBuf buf : bufs) {
size += buf.getPossibleMemoryConsumed();
}
}
return size;
}

private SelectionVector2 newSV2() throws OutOfMemoryException, InterruptedException {
SelectionVector2 sv2 = new SelectionVector2(oContext.getAllocator());
SelectionVector2 sv2 = new SelectionVector2(oAllocator);
if (!sv2.allocateNewSafe(incoming.getRecordCount())) {
try {
// Not being able to allocate sv2 means this operator's allocator reached it's maximum capacity.
// Spilling this.batchGroups won't help here as they are owned by upstream operator,
// but spilling spilledBatchGroups may free enough memory
final BatchGroup merged = mergeAndSpill(spilledBatchGroups);
final BatchGroup merged = mergeAndSpill(batchGroups);
if (merged != null) {
spilledBatchGroups.addFirst(merged);
spilledBatchGroups.add(merged);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, what is this change? It seems strange.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure actually. We use addFirst in 2 places only and add in 3 other places. Maybe I should just change all of them to use add instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ordering in the queue only matters for determining which BatchGroups will be merged the next time mergeAndSpill is called. Since mergeAndSpill takes batches off the the end of the queue, we want to put the outcome at the front of the queue, since the outcome BatchGroup has a much larger spill size than the ones at the end of the queue, and is thus more expensive to merge.

So I think you should revert this change, and leave it the way it is. It might be a good idea to take more extensive look at the spilling/merging strategy, but that can be a different task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will revert the change.

I'm confused though, why do we sometimes (in 3 different places) add the outcome of mergeAndSpill at the end of the queue ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on which queue we are merging from. When batches first come in to the ExternalSort, they are added to the batchGroup queue. Usually when spilling, we spill some number of batches from the batchGroup queue, and then put the result at the end of the spilledBatchGroup queue. In the cases where we need to re-merge the batchGroups, we take from the end of the spilledBatchGroup queue, and add them to the front of the queue.

If you are seeing a place where we mergeAndSpill the spilledBatchGroup queue and add to the end of the queue, that is a mistake and should be fixed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks Steven, I'm less confused now! The change is valid, because we are merging from the batchGroups queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, since you also changed it so that we are spilling the batchGroup queue, this change does make sense.

} else {
throw new OutOfMemoryException("Unable to allocate sv2, and not enough batchGroups to spill");
throw UserException.memoryError("Unable to allocate sv2 for %d records, and not enough batchGroups to spill.",
incoming.getRecordCount())
.addContext("batchGroups.size", batchGroups.size())
.addContext("spilledBatchGroups.size", spilledBatchGroups.size())
.addContext("allocated memory", oAllocator.getAllocatedMemory())
.addContext("allocator limit", oAllocator.getLimit())
.build(logger);
}
} catch (SchemaChangeException e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -771,7 +744,7 @@ private void createCopier(VectorAccessible batch, List<BatchGroup> batchGroupLis
copier.close();
}

BufferAllocator allocator = spilling ? copierAllocator : oContext.getAllocator();
BufferAllocator allocator = spilling ? copierAllocator : oAllocator;
for (VectorWrapper<?> i : batch) {
ValueVector v = TypeHelper.getNewVector(i.getField(), allocator);
outputContainer.add(v);
Expand Down