Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions java/vector/src/main/codegen/templates/UnionReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ public void copyAsValue(UnionWriter writer) {

</#list>

public int size() {
return getReaderForIndex(idx()).size();
}

<#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
<#assign uncappedName = name?uncap_first/>
<#assign boxedType = (minor.boxedType!type.boxedType) />
Expand Down
30 changes: 11 additions & 19 deletions java/vector/src/main/codegen/templates/UnionVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import com.google.common.collect.ImmutableList;
import com.google.flatbuffers.FlatBufferBuilder;
import io.netty.buffer.ArrowBuf;
import org.apache.arrow.flatbuf.Field;
import org.apache.arrow.flatbuf.Type;
import org.apache.arrow.flatbuf.Union;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.types.pojo.ArrowType;

import java.util.ArrayList;
import java.util.List;

<@pp.dropOutputFile />
Expand All @@ -39,14 +28,17 @@
<#include "/@includes/vv_imports.ftl" />
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import org.apache.arrow.vector.BaseDataValueVector;
import org.apache.arrow.vector.complex.impl.ComplexCopier;
import org.apache.arrow.vector.util.CallBack;
import org.apache.arrow.vector.schema.ArrowFieldNode;

import static org.apache.arrow.flatbuf.UnionMode.Sparse;



/*
* This class is generated using freemarker and the ${.template_name} template.
*/
Expand Down Expand Up @@ -81,13 +73,15 @@ public class UnionVector implements FieldVector {
private ValueVector singleVector;

private final CallBack callBack;
private final List<BufferBacked> innerVectors;

public UnionVector(String name, BufferAllocator allocator, CallBack callBack) {
this.name = name;
this.allocator = allocator;
this.internalMap = new MapVector("internal", allocator, callBack);
this.typeVector = new UInt1Vector("types", allocator);
this.callBack = callBack;
this.innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList(typeVector));
}

public BufferAllocator getAllocator() {
Expand All @@ -101,30 +95,28 @@ public MinorType getMinorType() {

@Override
public void initializeChildrenFromFields(List<Field> children) {
getMap().initializeChildrenFromFields(children);
internalMap.initializeChildrenFromFields(children);
}

@Override
public List<FieldVector> getChildrenFromFields() {
return getMap().getChildrenFromFields();
return internalMap.getChildrenFromFields();
}

@Override
public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
// TODO
throw new UnsupportedOperationException();
BaseDataValueVector.load(getFieldInnerVectors(), ownBuffers);
this.valueCount = fieldNode.getLength();
}

@Override
public List<ArrowBuf> getFieldBuffers() {
// TODO
throw new UnsupportedOperationException();
return BaseDataValueVector.unload(getFieldInnerVectors());
}

@Override
public List<BufferBacked> getFieldInnerVectors() {
// TODO
throw new UnsupportedOperationException();
return this.innerVectors;
}

public NullableMapVector getMap() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public void load(ArrowRecordBatch recordBatch) {
}

private void loadBuffers(FieldVector vector, Field field, Iterator<ArrowBuf> buffers, Iterator<ArrowFieldNode> nodes) {
checkArgument(nodes.hasNext(),
"no more field nodes for for field " + field + " and vector " + vector);
ArrowFieldNode fieldNode = nodes.next();
List<VectorLayout> typeLayout = field.getTypeLayout().getVectors();
List<ArrowBuf> ownBuffers = new ArrayList<>(typeLayout.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ public static TypeLayout getTypeLayout(final ArrowType arrowType) {
break;
case UnionMode.Sparse:
vectors = asList(
validityVector(),
typeVector()
typeVector() // type of the value at the index or 0 if null
);
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ private void validateComplexContent(int count, NullableMapVector parent) {
Assert.assertEquals(i % 3, rootReader.reader("list").size());
NullableTimeStampHolder h = new NullableTimeStampHolder();
rootReader.reader("map").reader("timestamp").read(h);
Assert.assertEquals(i, h.value % COUNT);
Assert.assertEquals(i, h.value);
}
}

Expand Down Expand Up @@ -339,4 +339,112 @@ public void testWriteReadMultipleRBs() throws IOException {
}
}

@Test
public void testWriteReadUnion() throws IOException {
File file = new File("target/mytest_write_union.arrow");
int count = COUNT;
try (
BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null)) {

writeUnionData(count, parent);

printVectors(parent.getChildrenFromFields());

validateUnionData(count, parent);

write(parent.getChild("root"), file);
}
// read
try (
BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
FileInputStream fileInputStream = new FileInputStream(file);
ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator);
BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null)
) {
ArrowFooter footer = arrowReader.readFooter();
Schema schema = footer.getSchema();
LOGGER.debug("reading schema: " + schema);

// initialize vectors

NullableMapVector root = parent.addOrGet("root", MinorType.MAP, NullableMapVector.class);
VectorLoader vectorLoader = new VectorLoader(schema, root);

List<ArrowBlock> recordBatches = footer.getRecordBatches();
for (ArrowBlock rbBlock : recordBatches) {
try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
vectorLoader.load(recordBatch);
}
validateUnionData(count, parent);
}
}
}

public void validateUnionData(int count, MapVector parent) {
MapReader rootReader = new SingleMapReaderImpl(parent).reader("root");
for (int i = 0; i < count; i++) {
rootReader.setPosition(i);
switch (i % 4) {
case 0:
Assert.assertEquals(i, rootReader.reader("union").readInteger().intValue());
break;
case 1:
Assert.assertEquals(i, rootReader.reader("union").readLong().longValue());
break;
case 2:
Assert.assertEquals(i % 3, rootReader.reader("union").size());
break;
case 3:
NullableTimeStampHolder h = new NullableTimeStampHolder();
rootReader.reader("union").reader("timestamp").read(h);
Assert.assertEquals(i, h.value);
break;
}
}
}

public void writeUnionData(int count, NullableMapVector parent) {
ArrowBuf varchar = allocator.buffer(3);
varchar.readerIndex(0);
varchar.setByte(0, 'a');
varchar.setByte(1, 'b');
varchar.setByte(2, 'c');
varchar.writerIndex(3);
ComplexWriter writer = new ComplexWriterImpl("root", parent);
MapWriter rootWriter = writer.rootAsMap();
IntWriter intWriter = rootWriter.integer("union");
BigIntWriter bigIntWriter = rootWriter.bigInt("union");
ListWriter listWriter = rootWriter.list("union");
MapWriter mapWriter = rootWriter.map("union");
for (int i = 0; i < count; i++) {
switch (i % 4) {
case 0:
intWriter.setPosition(i);
intWriter.writeInt(i);
break;
case 1:
bigIntWriter.setPosition(i);
bigIntWriter.writeBigInt(i);
break;
case 2:
listWriter.setPosition(i);
listWriter.startList();
for (int j = 0; j < i % 3; j++) {
listWriter.varChar().writeVarChar(0, 3, varchar);
}
listWriter.endList();
break;
case 3:
mapWriter.setPosition(i);
mapWriter.start();
mapWriter.timeStamp("timestamp").writeTimeStamp(i);
mapWriter.end();
break;
}
}
writer.setValueCount(count);
varchar.release();
}
}