Skip to content

Commit d06c491

Browse files
julienledemwesm
authored andcommitted
ARROW-399: ListVector.loadFieldBuffers ignores the ArrowFieldNode len…
…gth metadata Author: Julien Le Dem <julien@dremio.com> Closes #227 from julienledem/arrow_399 and squashes the following commits: 93a77cb [Julien Le Dem] set padding; add test 462a36c [Julien Le Dem] ARROW-399: ListVector.loadFieldBuffers ignores the ArrowFieldNode length metadata
1 parent a5362c2 commit d06c491

File tree

8 files changed

+136
-45
lines changed

8 files changed

+136
-45
lines changed

java/vector/src/main/codegen/templates/FixedValueVectors.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
public final class ${minor.class}Vector extends BaseDataValueVector implements FixedWidthVector{
4646
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${minor.class}Vector.class);
4747
48+
public static final int TYPE_WIDTH = ${type.width};
49+
4850
private final Accessor accessor = new Accessor();
4951
private final Mutator mutator = new Mutator();
5052

java/vector/src/main/codegen/templates/NullableValueVectors.java

Lines changed: 24 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import org.apache.arrow.flatbuf.Precision;
3838

3939
/**
40-
* Nullable${minor.class} implements a vector of values which could be null. Elements in the vector
40+
* ${className} implements a vector of values which could be null. Elements in the vector
4141
* are first checked against a fixed length vector of boolean values. Then the element is retrieved
4242
* from the base class (if not null).
4343
*
@@ -47,7 +47,7 @@
4747
public final class ${className} extends BaseDataValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, NullableVector, FieldVector {
4848
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${className}.class);
4949

50-
private final FieldReader reader = new ${minor.class}ReaderImpl(Nullable${minor.class}Vector.this);
50+
private final FieldReader reader = new ${minor.class}ReaderImpl(${className}.this);
5151

5252
private final String bitsField = "$bits$";
5353
private final String valuesField = "$values$";
@@ -67,7 +67,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
6767

6868
public ${className}(String name, BufferAllocator allocator, int precision, int scale) {
6969
super(name, allocator);
70-
values = new ${minor.class}Vector(valuesField, allocator, precision, scale);
70+
values = new ${valuesName}(valuesField, allocator, precision, scale);
7171
this.precision = precision;
7272
this.scale = scale;
7373
mutator = new Mutator();
@@ -81,7 +81,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
8181
<#else>
8282
public ${className}(String name, BufferAllocator allocator) {
8383
super(name, allocator);
84-
values = new ${minor.class}Vector(valuesField, allocator);
84+
values = new ${valuesName}(valuesField, allocator);
8585
mutator = new Mutator();
8686
accessor = new Accessor();
8787
<#if minor.class == "TinyInt" ||
@@ -144,6 +144,13 @@ public List<FieldVector> getChildrenFromFields() {
144144

145145
@Override
146146
public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
147+
<#if type.major = "VarLen">
148+
// variable width values: truncate offset vector buffer to size (#1)
149+
org.apache.arrow.vector.BaseDataValueVector.truncateBufferBasedOnSize(ownBuffers, 1, values.offsetVector.getBufferSizeFor(fieldNode.getLength() + 1));
150+
<#else>
151+
// fixed width values truncate value vector to size (#1)
152+
org.apache.arrow.vector.BaseDataValueVector.truncateBufferBasedOnSize(ownBuffers, 1, values.getBufferSizeFor(fieldNode.getLength()));
153+
</#if>
147154
org.apache.arrow.vector.BaseDataValueVector.load(fieldNode, getFieldInnerVectors(), ownBuffers);
148155
bits.valueCount = fieldNode.getLength();
149156
}
@@ -229,13 +236,6 @@ public void setInitialCapacity(int numRecords) {
229236
values.setInitialCapacity(numRecords);
230237
}
231238

232-
// @Override
233-
// public SerializedField.Builder getMetadataBuilder() {
234-
// return super.getMetadataBuilder()
235-
// .addChild(bits.getMetadata())
236-
// .addChild(values.getMetadata());
237-
// }
238-
239239
@Override
240240
public void allocateNew() {
241241
if(!allocateNewSafe()){
@@ -329,20 +329,6 @@ public void zeroVector() {
329329
}
330330
</#if>
331331
332-
333-
// @Override
334-
// public void load(SerializedField metadata, ArrowBuf buffer) {
335-
// clear();
336-
// the bits vector is the first child (the order in which the children are added in getMetadataBuilder is significant)
337-
// final SerializedField bitsField = metadata.getChild(0);
338-
// bits.load(bitsField, buffer);
339-
//
340-
// final int capacity = buffer.capacity();
341-
// final int bitsLength = bitsField.getBufferLength();
342-
// final SerializedField valuesField = metadata.getChild(1);
343-
// values.load(valuesField, buffer.slice(bitsLength, capacity - bitsLength));
344-
// }
345-
346332
@Override
347333
public TransferPair getTransferPair(BufferAllocator allocator){
348334
return new TransferImpl(name, allocator);
@@ -356,10 +342,10 @@ public TransferPair getTransferPair(String ref, BufferAllocator allocator){
356342
357343
@Override
358344
public TransferPair makeTransferPair(ValueVector to) {
359-
return new TransferImpl((Nullable${minor.class}Vector) to);
345+
return new TransferImpl((${className}) to);
360346
}
361347
362-
public void transferTo(Nullable${minor.class}Vector target){
348+
public void transferTo(${className} target){
363349
bits.transferTo(target.bits);
364350
values.transferTo(target.values);
365351
<#if type.major == "VarLen">
@@ -368,7 +354,7 @@ public void transferTo(Nullable${minor.class}Vector target){
368354
clear();
369355
}
370356
371-
public void splitAndTransferTo(int startIndex, int length, Nullable${minor.class}Vector target) {
357+
public void splitAndTransferTo(int startIndex, int length, ${className} target) {
372358
bits.splitAndTransferTo(startIndex, length, target.bits);
373359
values.splitAndTransferTo(startIndex, length, target.values);
374360
<#if type.major == "VarLen">
@@ -377,22 +363,22 @@ public void splitAndTransferTo(int startIndex, int length, Nullable${minor.class
377363
}
378364
379365
private class TransferImpl implements TransferPair {
380-
Nullable${minor.class}Vector to;
366+
${className} to;
381367
382368
public TransferImpl(String name, BufferAllocator allocator){
383369
<#if minor.class == "Decimal">
384-
to = new Nullable${minor.class}Vector(name, allocator, precision, scale);
370+
to = new ${className}(name, allocator, precision, scale);
385371
<#else>
386-
to = new Nullable${minor.class}Vector(name, allocator);
372+
to = new ${className}(name, allocator);
387373
</#if>
388374
}
389375
390-
public TransferImpl(Nullable${minor.class}Vector to){
376+
public TransferImpl(${className} to){
391377
this.to = to;
392378
}
393379
394380
@Override
395-
public Nullable${minor.class}Vector getTo(){
381+
public ${className} getTo(){
396382
return to;
397383
}
398384
@@ -408,7 +394,7 @@ public void splitAndTransfer(int startIndex, int length) {
408394
409395
@Override
410396
public void copyValueSafe(int fromIndex, int toIndex) {
411-
to.copyFromSafe(fromIndex, toIndex, Nullable${minor.class}Vector.this);
397+
to.copyFromSafe(fromIndex, toIndex, ${className}.this);
412398
}
413399
}
414400
@@ -422,22 +408,22 @@ public Mutator getMutator(){
422408
return mutator;
423409
}
424410
425-
public void copyFrom(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){
411+
public void copyFrom(int fromIndex, int thisIndex, ${className} from){
426412
final Accessor fromAccessor = from.getAccessor();
427413
if (!fromAccessor.isNull(fromIndex)) {
428414
mutator.set(thisIndex, fromAccessor.get(fromIndex));
429415
}
430416
}
431417
432-
public void copyFromSafe(int fromIndex, int thisIndex, ${minor.class}Vector from){
418+
public void copyFromSafe(int fromIndex, int thisIndex, ${valuesName} from){
433419
<#if type.major == "VarLen">
434420
mutator.fillEmpties(thisIndex);
435421
</#if>
436422
values.copyFromSafe(fromIndex, thisIndex, from);
437423
bits.getMutator().setSafe(thisIndex, 1);
438424
}
439425
440-
public void copyFromSafe(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){
426+
public void copyFromSafe(int fromIndex, int thisIndex, ${className} from){
441427
<#if type.major == "VarLen">
442428
mutator.fillEmpties(thisIndex);
443429
</#if>
@@ -640,7 +626,7 @@ public void set(int index, ${minor.class}Holder holder){
640626
}
641627
642628
public boolean isSafe(int outIndex) {
643-
return outIndex < Nullable${minor.class}Vector.this.getValueCapacity();
629+
return outIndex < ${className}.this.getValueCapacity();
644630
}
645631
646632
<#assign fields = minor.fields!type.fields />

java/vector/src/main/codegen/templates/UnionVector.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ public List<FieldVector> getChildrenFromFields() {
103103

104104
@Override
105105
public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
106+
// truncate types vector buffer to size (#0)
107+
org.apache.arrow.vector.BaseDataValueVector.truncateBufferBasedOnSize(ownBuffers, 0, typeVector.getBufferSizeFor(fieldNode.getLength()));
106108
BaseDataValueVector.load(fieldNode, getFieldInnerVectors(), ownBuffers);
107109
this.valueCount = fieldNode.getLength();
108110
}

java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ public abstract class BaseDataValueVector extends BaseValueVector implements Buf
3030

3131
protected final static byte[] emptyByteArray = new byte[]{}; // Nullable vectors use this
3232

33+
/** maximum extra size at the end of the buffer */
34+
private static final int MAX_BUFFER_PADDING = 64;
35+
3336
public static void load(ArrowFieldNode fieldNode, List<BufferBacked> vectors, List<ArrowBuf> buffers) {
3437
int expectedSize = vectors.size();
3538
if (buffers.size() != expectedSize) {
@@ -40,6 +43,20 @@ public static void load(ArrowFieldNode fieldNode, List<BufferBacked> vectors, Li
4043
}
4144
}
4245

46+
public static void truncateBufferBasedOnSize(List<ArrowBuf> buffers, int bufferIndex, int byteSize) {
47+
if (bufferIndex >= buffers.size()) {
48+
throw new IllegalArgumentException("no buffer at index " + bufferIndex + ": " + buffers);
49+
}
50+
ArrowBuf buffer = buffers.get(bufferIndex);
51+
if (buffer.writerIndex() < byteSize) {
52+
throw new IllegalArgumentException("can not truncate buffer to a larger size " + byteSize + ": " + buffer.writerIndex());
53+
}
54+
if (buffer.writerIndex() - byteSize > MAX_BUFFER_PADDING) {
55+
throw new IllegalArgumentException("Buffer too large to resize to " + byteSize + ": " + buffer.writerIndex());
56+
}
57+
buffer.writerIndex(byteSize);
58+
}
59+
4360
public static List<ArrowBuf> unload(List<BufferBacked> vectors) {
4461
List<ArrowBuf> result = new ArrayList<>(vectors.size());
4562
for (BufferBacked vector : vectors) {

java/vector/src/main/java/org/apache/arrow/vector/BitVector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public void load(ArrowFieldNode fieldNode, ArrowBuf data) {
6868
int remainder = count % 8;
6969
// set remaining bits
7070
if (remainder > 0) {
71-
byte bitMask = (byte) (0xFFL >>> ((8 - remainder) & 7));;
71+
byte bitMask = (byte) (0xFFL >>> ((8 - remainder) & 7));
7272
this.data.setByte(fullBytesCount, bitMask);
7373
}
7474
} else if (fieldNode.getNullCount() == fieldNode.getLength()) {

java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ private void loadBuffers(FieldVector vector, Field field, Iterator<ArrowBuf> buf
8282
vector.loadFieldBuffers(fieldNode, ownBuffers);
8383
} catch (RuntimeException e) {
8484
throw new IllegalArgumentException("Could not load buffers for field " +
85-
field + " error message" + e.getMessage(), e);
85+
field + ". error message: " + e.getMessage(), e);
8686
}
8787
List<Field> children = field.getChildren();
8888
if (children.size() > 0) {

java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ public List<FieldVector> getChildrenFromFields() {
9393

9494
@Override
9595
public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
96+
// variable width values: truncate offset vector buffer to size (#1)
97+
org.apache.arrow.vector.BaseDataValueVector.truncateBufferBasedOnSize(ownBuffers, 1, offsets.getBufferSizeFor(fieldNode.getLength() + 1));
9698
BaseDataValueVector.load(fieldNode, getFieldInnerVectors(), ownBuffers);
9799
}
98100

java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java

Lines changed: 87 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.junit.Assert.assertTrue;
2424

2525
import java.io.IOException;
26+
import java.util.ArrayList;
2627
import java.util.Collections;
2728
import java.util.List;
2829

@@ -32,6 +33,7 @@
3233
import org.apache.arrow.vector.complex.impl.ComplexWriterImpl;
3334
import org.apache.arrow.vector.complex.reader.FieldReader;
3435
import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter;
36+
import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter;
3537
import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
3638
import org.apache.arrow.vector.complex.writer.BigIntWriter;
3739
import org.apache.arrow.vector.complex.writer.IntWriter;
@@ -99,6 +101,79 @@ public void testUnloadLoad() throws IOException {
99101
}
100102
}
101103

104+
@Test
105+
public void testUnloadLoadAddPadding() throws IOException {
106+
int count = 10000;
107+
Schema schema;
108+
try (
109+
BufferAllocator originalVectorsAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
110+
MapVector parent = new MapVector("parent", originalVectorsAllocator, null)) {
111+
112+
// write some data
113+
ComplexWriter writer = new ComplexWriterImpl("root", parent);
114+
MapWriter rootWriter = writer.rootAsMap();
115+
ListWriter list = rootWriter.list("list");
116+
IntWriter intWriter = list.integer();
117+
for (int i = 0; i < count; i++) {
118+
list.setPosition(i);
119+
list.startList();
120+
for (int j = 0; j < i % 4 + 1; j++) {
121+
intWriter.writeInt(i);
122+
}
123+
list.endList();
124+
}
125+
writer.setValueCount(count);
126+
127+
// unload it
128+
FieldVector root = parent.getChild("root");
129+
schema = new Schema(root.getField().getChildren());
130+
VectorUnloader vectorUnloader = newVectorUnloader(root);
131+
try (
132+
ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
133+
BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
134+
VectorSchemaRoot newRoot = new VectorSchemaRoot(schema, finalVectorsAllocator);
135+
) {
136+
List<ArrowBuf> oldBuffers = recordBatch.getBuffers();
137+
List<ArrowBuf> newBuffers = new ArrayList<>();
138+
for (ArrowBuf oldBuffer : oldBuffers) {
139+
int l = oldBuffer.readableBytes();
140+
if (l % 64 != 0) {
141+
// pad
142+
l = l + 64 - l % 64;
143+
}
144+
ArrowBuf newBuffer = allocator.buffer(l);
145+
for (int i = oldBuffer.readerIndex(); i < oldBuffer.writerIndex(); i++) {
146+
newBuffer.setByte(i - oldBuffer.readerIndex(), oldBuffer.getByte(i));
147+
}
148+
newBuffer.readerIndex(0);
149+
newBuffer.writerIndex(l);
150+
newBuffers.add(newBuffer);
151+
}
152+
153+
try (ArrowRecordBatch newBatch = new ArrowRecordBatch(recordBatch.getLength(), recordBatch.getNodes(), newBuffers);) {
154+
// load it
155+
VectorLoader vectorLoader = new VectorLoader(newRoot);
156+
157+
vectorLoader.load(newBatch);
158+
159+
FieldReader reader = newRoot.getVector("list").getReader();
160+
for (int i = 0; i < count; i++) {
161+
reader.setPosition(i);
162+
List<Integer> expected = new ArrayList<>();
163+
for (int j = 0; j < i % 4 + 1; j++) {
164+
expected.add(i);
165+
}
166+
Assert.assertEquals(expected, reader.readObject());
167+
}
168+
}
169+
170+
for (ArrowBuf newBuf : newBuffers) {
171+
newBuf.release();
172+
}
173+
}
174+
}
175+
}
176+
102177
/**
103178
* The validity buffer can be empty if:
104179
* - all values are defined
@@ -113,12 +188,17 @@ public void testLoadEmptyValidityBuffer() throws IOException {
113188
));
114189
int count = 10;
115190
ArrowBuf validity = allocator.getEmpty();
116-
ArrowBuf values = allocator.buffer(count * 4); // integers
117-
for (int i = 0; i < count; i++) {
118-
values.setInt(i * 4, i);
191+
ArrowBuf[] values = new ArrowBuf[2];
192+
for (int i = 0; i < values.length; i++) {
193+
ArrowBuf arrowBuf = allocator.buffer(count * 4); // integers
194+
values[i] = arrowBuf;
195+
for (int j = 0; j < count; j++) {
196+
arrowBuf.setInt(j * 4, j);
197+
}
198+
arrowBuf.writerIndex(count * 4);
119199
}
120200
try (
121-
ArrowRecordBatch recordBatch = new ArrowRecordBatch(count, asList(new ArrowFieldNode(count, 0), new ArrowFieldNode(count, count)), asList(validity, values, validity, values));
201+
ArrowRecordBatch recordBatch = new ArrowRecordBatch(count, asList(new ArrowFieldNode(count, 0), new ArrowFieldNode(count, count)), asList(validity, values[0], validity, values[1]));
122202
BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
123203
VectorSchemaRoot newRoot = new VectorSchemaRoot(schema, finalVectorsAllocator);
124204
) {
@@ -153,7 +233,9 @@ public void testLoadEmptyValidityBuffer() throws IOException {
153233
assertFalse(intDefinedVector.getAccessor().isNull(count + 10));
154234
assertEquals(1234, intDefinedVector.getAccessor().get(count + 10));
155235
} finally {
156-
values.release();
236+
for (ArrowBuf arrowBuf : values) {
237+
arrowBuf.release();
238+
}
157239
}
158240
}
159241

0 commit comments

Comments
 (0)