Skip to content

Commit bfe6865

Browse files
Pindikura Ravindrasiddharthteotia
authored andcommitted
ARROW-4147: [Java] reduce heap usage for varwidth vectors (#3298)
* ARROW-4147: reduce heap usage for varwidth vectors - some code reorg to avoid duplication - changed the default initial alloc from 4096 to 3970 * ARROW-4147: [Java] Address review comments * ARROW-4147: remove check on width to be <= 16: * ARROW-4147: allow initial valueCount to be 0. * ARROW-4147: Fix incorrect comment on initial alloc
1 parent 420c949 commit bfe6865

File tree

9 files changed

+799
-648
lines changed

9 files changed

+799
-648
lines changed

java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java

Lines changed: 33 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.Collections;
2323
import java.util.List;
2424

25-
import org.apache.arrow.memory.BaseAllocator;
2625
import org.apache.arrow.memory.BufferAllocator;
2726
import org.apache.arrow.memory.OutOfMemoryException;
2827
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
@@ -43,8 +42,7 @@ public abstract class BaseFixedWidthVector extends BaseValueVector
4342
implements FixedWidthVector, FieldVector, VectorDefinitionSetter {
4443
private final int typeWidth;
4544

46-
protected int valueAllocationSizeInBytes;
47-
protected int validityAllocationSizeInBytes;
45+
protected int initialValueAllocation;
4846

4947
protected final Field field;
5048
private int allocationMonitor;
@@ -61,14 +59,7 @@ public BaseFixedWidthVector(final String name, final BufferAllocator allocator,
6159
allocationMonitor = 0;
6260
validityBuffer = allocator.getEmpty();
6361
valueBuffer = allocator.getEmpty();
64-
if (typeWidth > 0) {
65-
valueAllocationSizeInBytes = INITIAL_VALUE_ALLOCATION * typeWidth;
66-
validityAllocationSizeInBytes = getValidityBufferSizeFromCount(INITIAL_VALUE_ALLOCATION);
67-
} else {
68-
/* specialized handling for BitVector */
69-
valueAllocationSizeInBytes = getValidityBufferSizeFromCount(INITIAL_VALUE_ALLOCATION);
70-
validityAllocationSizeInBytes = valueAllocationSizeInBytes;
71-
}
62+
initialValueAllocation = INITIAL_VALUE_ALLOCATION;
7263
}
7364

7465

@@ -159,12 +150,8 @@ public ArrowBuf getOffsetBuffer() {
159150
*/
160151
@Override
161152
public void setInitialCapacity(int valueCount) {
162-
final long size = (long) valueCount * typeWidth;
163-
if (size > MAX_ALLOCATION_SIZE) {
164-
throw new OversizedAllocationException("Requested amount of memory is more than max allowed");
165-
}
166-
valueAllocationSizeInBytes = (int) size;
167-
validityAllocationSizeInBytes = getValidityBufferSizeFromCount(valueCount);
153+
computeAndCheckBufferSize(valueCount);
154+
initialValueAllocation = valueCount;
168155
}
169156

170157
/**
@@ -267,18 +254,13 @@ public void allocateNew() {
267254
*/
268255
@Override
269256
public boolean allocateNewSafe() {
270-
long curAllocationSizeValue = valueAllocationSizeInBytes;
271-
long curAllocationSizeValidity = validityAllocationSizeInBytes;
272-
273-
if (align(curAllocationSizeValue) + curAllocationSizeValidity > MAX_ALLOCATION_SIZE) {
274-
throw new OversizedAllocationException("Requested amount of memory exceeds limit");
275-
}
257+
computeAndCheckBufferSize(initialValueAllocation);
276258

277259
/* we are doing a new allocation -- release the current buffers */
278260
clear();
279261

280262
try {
281-
allocateBytes(curAllocationSizeValue, curAllocationSizeValidity);
263+
allocateBytes(initialValueAllocation);
282264
} catch (Exception e) {
283265
clear();
284266
return false;
@@ -295,33 +277,30 @@ public boolean allocateNewSafe() {
295277
* @throws org.apache.arrow.memory.OutOfMemoryException on error
296278
*/
297279
public void allocateNew(int valueCount) {
298-
long valueBufferSize = valueCount * typeWidth;
299-
long validityBufferSize = getValidityBufferSizeFromCount(valueCount);
300-
if (typeWidth == 0) {
301-
/* specialized handling for BitVector */
302-
valueBufferSize = validityBufferSize;
303-
}
304-
305-
if (align(valueBufferSize) + validityBufferSize > MAX_ALLOCATION_SIZE) {
306-
throw new OversizedAllocationException("Requested amount of memory is more than max allowed");
307-
}
280+
computeAndCheckBufferSize(valueCount);
308281

309282
/* we are doing a new allocation -- release the current buffers */
310283
clear();
311284

312285
try {
313-
allocateBytes(valueBufferSize, validityBufferSize);
286+
allocateBytes(valueCount);
314287
} catch (Exception e) {
315288
clear();
316289
throw e;
317290
}
318291
}
319292

320293
/*
321-
* align to a 8-byte value.
294+
* Compute the buffer size required for 'valueCount', and check if it's within bounds.
322295
*/
323-
private long align(long size) {
324-
return ((size + 7) / 8) * 8;
296+
private long computeAndCheckBufferSize(int valueCount) {
297+
final long size = computeCombinedBufferSize(valueCount, typeWidth);
298+
if (size > MAX_ALLOCATION_SIZE) {
299+
throw new OversizedAllocationException("Memory required for vector capacity " +
300+
valueCount +
301+
" is (" + size + "), which is more than max allowed (" + MAX_ALLOCATION_SIZE + ")");
302+
}
303+
return size;
325304
}
326305

327306
/**
@@ -333,25 +312,11 @@ private long align(long size) {
333312
* within the bounds of max allocation allowed and any other error
334313
* conditions.
335314
*/
336-
private void allocateBytes(final long valueBufferSize, final long validityBufferSize) {
337-
int valueBufferSlice = (int)align(valueBufferSize);
338-
int validityBufferSlice = (int)validityBufferSize;
339-
340-
/* allocate combined buffer */
341-
ArrowBuf buffer = allocator.buffer(valueBufferSlice + validityBufferSlice);
342-
343-
valueAllocationSizeInBytes = valueBufferSlice;
344-
valueBuffer = buffer.slice(0, valueBufferSlice);
345-
valueBuffer.retain();
346-
valueBuffer.readerIndex(0);
347-
348-
validityAllocationSizeInBytes = validityBufferSlice;
349-
validityBuffer = buffer.slice(valueBufferSlice, validityBufferSlice);
350-
validityBuffer.retain();
351-
validityBuffer.readerIndex(0);
315+
private void allocateBytes(int valueCount) {
316+
DataAndValidityBuffers buffers = allocFixedDataAndValidityBufs(valueCount, typeWidth);
317+
valueBuffer = buffers.getDataBuf();
318+
validityBuffer = buffers.getValidityBuf();
352319
zeroVector();
353-
354-
buffer.release();
355320
}
356321

357322
/**
@@ -363,7 +328,6 @@ private void allocateBytes(final long valueBufferSize, final long validityBuffer
363328
private void allocateValidityBuffer(final int validityBufferSize) {
364329
validityBuffer = allocator.buffer(validityBufferSize);
365330
validityBuffer.readerIndex(0);
366-
validityAllocationSizeInBytes = validityBufferSize;
367331
}
368332

369333
/**
@@ -439,50 +403,28 @@ public ArrowBuf[] getBuffers(boolean clear) {
439403
*/
440404
@Override
441405
public void reAlloc() {
442-
int valueBaseSize = Integer.max(valueBuffer.capacity(), valueAllocationSizeInBytes);
443-
long newValueBufferSlice = align(valueBaseSize * 2L);
444-
long newValidityBufferSlice;
445-
if (typeWidth > 0) {
446-
long targetValueBufferSize = align(BaseAllocator.nextPowerOfTwo(newValueBufferSlice));
447-
long targetValueCount = targetValueBufferSize / typeWidth;
448-
targetValueBufferSize -= getValidityBufferSizeFromCount((int) targetValueCount);
449-
if (newValueBufferSlice < targetValueBufferSize) {
450-
newValueBufferSlice = targetValueBufferSize;
406+
int targetValueCount = getValueCapacity() * 2;
407+
if (targetValueCount == 0) {
408+
if (initialValueAllocation > 0) {
409+
targetValueCount = initialValueAllocation * 2;
410+
} else {
411+
targetValueCount = INITIAL_VALUE_ALLOCATION * 2;
451412
}
452-
453-
newValidityBufferSlice = getValidityBufferSizeFromCount((int)(newValueBufferSlice / typeWidth));
454-
} else {
455-
newValidityBufferSlice = newValueBufferSlice;
456-
}
457-
458-
long newAllocationSize = newValueBufferSlice + newValidityBufferSlice;
459-
assert newAllocationSize >= 1;
460-
461-
if (newAllocationSize > MAX_ALLOCATION_SIZE) {
462-
throw new OversizedAllocationException("Unable to expand the buffer");
463413
}
414+
computeAndCheckBufferSize(targetValueCount);
464415

465-
final ArrowBuf newBuffer = allocator.buffer((int) newAllocationSize);
466-
final ArrowBuf newValueBuffer = newBuffer.slice(0, (int)newValueBufferSlice);
416+
DataAndValidityBuffers buffers = allocFixedDataAndValidityBufs(targetValueCount, typeWidth);
417+
final ArrowBuf newValueBuffer = buffers.getDataBuf();
467418
newValueBuffer.setBytes(0, valueBuffer, 0, valueBuffer.capacity());
468-
newValueBuffer.setZero(valueBuffer.capacity(), (int)newValueBufferSlice - valueBuffer.capacity());
469-
newValueBuffer.retain();
470-
newValueBuffer.readerIndex(0);
419+
newValueBuffer.setZero(valueBuffer.capacity(), newValueBuffer.capacity() - valueBuffer.capacity());
471420
valueBuffer.release();
472421
valueBuffer = newValueBuffer;
473-
valueAllocationSizeInBytes = (int)newValueBufferSlice;
474422

475-
final ArrowBuf newValidityBuffer = newBuffer.slice((int)newValueBufferSlice,
476-
(int)newValidityBufferSlice);
423+
final ArrowBuf newValidityBuffer = buffers.getValidityBuf();
477424
newValidityBuffer.setBytes(0, validityBuffer, 0, validityBuffer.capacity());
478-
newValidityBuffer.setZero(validityBuffer.capacity(), (int)newValidityBufferSlice - validityBuffer.capacity());
479-
newValidityBuffer.retain();
480-
newValidityBuffer.readerIndex(0);
425+
newValidityBuffer.setZero(validityBuffer.capacity(), newValidityBuffer.capacity() - validityBuffer.capacity());
481426
validityBuffer.release();
482427
validityBuffer = newValidityBuffer;
483-
validityAllocationSizeInBytes = (int)newValidityBufferSlice;
484-
485-
newBuffer.release();
486428
}
487429

488430
@Override
@@ -535,9 +477,6 @@ public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers
535477
valueBuffer = dataBuffer.retain(allocator);
536478

537479
valueCount = fieldNode.getLength();
538-
539-
valueAllocationSizeInBytes = valueBuffer.capacity();
540-
validityAllocationSizeInBytes = validityBuffer.capacity();
541480
}
542481

543482
/**

java/vector/src/main/java/org/apache/arrow/vector/BaseValueVector.java

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Collections;
2121
import java.util.Iterator;
2222

23+
import org.apache.arrow.memory.BaseAllocator;
2324
import org.apache.arrow.memory.BufferAllocator;
2425
import org.apache.arrow.util.Preconditions;
2526
import org.apache.arrow.vector.util.TransferPair;
@@ -33,7 +34,14 @@ public abstract class BaseValueVector implements ValueVector {
3334

3435
public static final String MAX_ALLOCATION_SIZE_PROPERTY = "arrow.vector.max_allocation_bytes";
3536
public static final int MAX_ALLOCATION_SIZE = Integer.getInteger(MAX_ALLOCATION_SIZE_PROPERTY, Integer.MAX_VALUE);
36-
public static final int INITIAL_VALUE_ALLOCATION = 4096;
37+
/*
38+
* For all fixed width vectors, the value and validity buffers are sliced from a single buffer.
39+
* Similarly, for variable width vectors, the offsets and validity buffers are sliced from a
40+
* single buffer. To ensure the single buffer is power-of-2 size, the initial value allocation
41+
* should be less than power-of-2. For IntVectors, this comes to 3970*4 (15880) for the data
42+
* buffer and 504 bytes for the validity buffer, totalling to 16384 (2^16).
43+
*/
44+
public static final int INITIAL_VALUE_ALLOCATION = 3970;
3745

3846
protected final BufferAllocator allocator;
3947
protected final String name;
@@ -98,5 +106,94 @@ protected ArrowBuf releaseBuffer(ArrowBuf buffer) {
98106
protected static int getValidityBufferSizeFromCount(final int valueCount) {
99107
return (int) Math.ceil(valueCount / 8.0);
100108
}
109+
110+
/* round up to the next multiple of 8 */
111+
private static long roundUp8(long size) {
112+
return ((size + 7) / 8) * 8;
113+
}
114+
115+
protected long computeCombinedBufferSize(int valueCount, int typeWidth) {
116+
Preconditions.checkArgument(valueCount >= 0, "valueCount must be >= 0");
117+
Preconditions.checkArgument(typeWidth >= 0, "typeWidth must be >= 0");
118+
119+
// compute size of validity buffer.
120+
long bufferSize = roundUp8(getValidityBufferSizeFromCount(valueCount));
121+
122+
// add the size of the value buffer.
123+
if (typeWidth == 0) {
124+
// for boolean type, value-buffer and validity-buffer are of same size.
125+
bufferSize *= 2;
126+
} else {
127+
bufferSize += roundUp8(valueCount * typeWidth);
128+
}
129+
return BaseAllocator.nextPowerOfTwo(bufferSize);
130+
}
131+
132+
class DataAndValidityBuffers {
133+
private ArrowBuf dataBuf;
134+
private ArrowBuf validityBuf;
135+
136+
DataAndValidityBuffers(ArrowBuf dataBuf, ArrowBuf validityBuf) {
137+
this.dataBuf = dataBuf;
138+
this.validityBuf = validityBuf;
139+
}
140+
141+
public ArrowBuf getDataBuf() {
142+
return dataBuf;
143+
}
144+
145+
public ArrowBuf getValidityBuf() {
146+
return validityBuf;
147+
}
148+
149+
}
150+
151+
protected DataAndValidityBuffers allocFixedDataAndValidityBufs(int valueCount, int typeWidth) {
152+
long bufferSize = computeCombinedBufferSize(valueCount, typeWidth);
153+
assert bufferSize < MAX_ALLOCATION_SIZE;
154+
155+
int validityBufferSize;
156+
int dataBufferSize;
157+
if (typeWidth == 0) {
158+
validityBufferSize = dataBufferSize = (int) (bufferSize / 2);
159+
} else {
160+
// Due to roundup to power-of-2 allocation, the bufferSize could be greater than the
161+
// requested size. Utilize the allocated buffer fully.;
162+
int actualCount = (int) ((bufferSize * 8.0) / (8 * typeWidth + 1));
163+
do {
164+
validityBufferSize = (int) roundUp8(getValidityBufferSizeFromCount(actualCount));
165+
dataBufferSize = (int) roundUp8(actualCount * typeWidth);
166+
if (validityBufferSize + dataBufferSize <= bufferSize) {
167+
break;
168+
}
169+
--actualCount;
170+
} while (true);
171+
}
172+
173+
174+
/* allocate combined buffer */
175+
ArrowBuf combinedBuffer = allocator.buffer((int) bufferSize);
176+
177+
/* slice into requested lengths */
178+
ArrowBuf dataBuf = null;
179+
ArrowBuf validityBuf = null;
180+
int bufferOffset = 0;
181+
for (int numBuffers = 0; numBuffers < 2; ++numBuffers) {
182+
int len = (numBuffers == 0 ? dataBufferSize : validityBufferSize);
183+
ArrowBuf buf = combinedBuffer.slice(bufferOffset, len);
184+
buf.retain();
185+
buf.readerIndex(0);
186+
buf.writerIndex(0);
187+
188+
bufferOffset += len;
189+
if (numBuffers == 0) {
190+
dataBuf = buf;
191+
} else {
192+
validityBuf = buf;
193+
}
194+
}
195+
combinedBuffer.release();
196+
return new DataAndValidityBuffers(dataBuf, validityBuf);
197+
}
101198
}
102199

0 commit comments

Comments
 (0)