Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -59,6 +59,10 @@
public class HBaseRecordReader extends AbstractRecordReader implements DrillHBaseConstants {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class);

// batch should not exceed this value to avoid OOM on a busy system
private static final int MAX_ALLOCATED_MEMORY_PER_BATCH = 64 * 1024 * 1024; // 64 mb in bytes

// batch size should not exceed max allowed record count
private static final int TARGET_RECORD_COUNT = 4000;

private OutputMutator outputMutator;
Expand Down Expand Up @@ -134,7 +138,7 @@ protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> columns
public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
this.operatorContext = context;
this.outputMutator = output;
familyVectorMap = new HashMap<String, MapVector>();
familyVectorMap = new HashMap<>();

try {
hTable = connection.getTable(hbaseTableName);
Expand Down Expand Up @@ -187,8 +191,8 @@ public int next() {
}

int rowCount = 0;
done:
for (; rowCount < TARGET_RECORD_COUNT; rowCount++) {
// if allocated memory for the first row is larger than allowed max in batch, it will be added anyway
do {
Result result = null;
final OperatorStats operatorStats = operatorContext == null ? null : operatorContext.getStats();
try {
Expand All @@ -206,13 +210,17 @@ public int next() {
throw new DrillRuntimeException(e);
}
if (result == null) {
break done;
break;
}

// parse the result and populate the value vectors
Cell[] cells = result.rawCells();
if (rowKeyVector != null) {
rowKeyVector.getMutator().setSafe(rowCount, cells[0].getRowArray(), cells[0].getRowOffset(), cells[0].getRowLength());
rowKeyVector.getMutator().setSafe(
rowCount,
cells[0].getRowArray(),
cells[0].getRowOffset(),
cells[0].getRowLength());
}
if (!rowKeyOnly) {
for (final Cell cell : cells) {
Expand All @@ -224,15 +232,17 @@ public int next() {
final int qualifierOffset = cell.getQualifierOffset();
final int qualifierLength = cell.getQualifierLength();
final byte[] qualifierArray = cell.getQualifierArray();
final NullableVarBinaryVector v = getOrCreateColumnVector(mv, new String(qualifierArray, qualifierOffset, qualifierLength));
final NullableVarBinaryVector v = getOrCreateColumnVector(mv,
new String(qualifierArray, qualifierOffset, qualifierLength));

final int valueOffset = cell.getValueOffset();
final int valueLength = cell.getValueLength();
final byte[] valueArray = cell.getValueArray();
v.getMutator().setSafe(rowCount, valueArray, valueOffset, valueLength);
}
}
}
rowCount++;
} while (canAddNewRow(rowCount));

setOutputRowCount(rowCount);
logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), rowCount);
Expand Down Expand Up @@ -289,4 +299,19 @@ private void setOutputRowCount(int count) {
rowKeyVector.getMutator().setValueCount(count);
}
}

/**
* Checks if new row can be added in batch. Row can be added if:
* <ul>
* <li>current row count does not exceed max allowed one</li>
* <li>allocated memory does not exceed max allowed one</li>
* </ul>
*
* @param rowCount current row count
* @return true if new row can be added in batch, false otherwise
*/
private boolean canAddNewRow(int rowCount) {
return rowCount < TARGET_RECORD_COUNT &&
operatorContext.getAllocator().getAllocatedMemory() < MAX_ALLOCATED_MEMORY_PER_BATCH;
}
}