Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions java/vector/src/main/codegen/templates/UnionVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.complex.UnionVector.TransferImpl;
import org.apache.arrow.vector.util.CallBack;
import org.apache.arrow.vector.util.TransferPair;

import java.util.List;

<@pp.dropOutputFile />
Expand Down Expand Up @@ -235,12 +240,17 @@ public Field getField() {

@Override
public TransferPair getTransferPair(BufferAllocator allocator) {
return new TransferImpl(name, allocator);
return getTransferPair(name, allocator);
}

@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
return new TransferImpl(ref, allocator);
return getTransferPair(ref, allocator, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to keep this method?

}

@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
return new org.apache.arrow.vector.complex.UnionVector.TransferImpl(ref, allocator, callBack);
}

@Override
Expand Down Expand Up @@ -280,8 +290,8 @@ private class TransferImpl implements TransferPair {

UnionVector to;

public TransferImpl(String name, BufferAllocator allocator) {
to = new UnionVector(name, allocator, null);
public TransferImpl(String name, BufferAllocator allocator, CallBack callBack) {
to = new UnionVector(name, allocator, callBack);
}

public TransferImpl(UnionVector to) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.arrow.memory.BufferAllocator;

import io.netty.buffer.ArrowBuf;
import org.apache.arrow.vector.util.CallBack;
import org.apache.arrow.vector.util.TransferPair;


public abstract class BaseDataValueVector extends BaseValueVector implements BufferBacked {
Expand Down Expand Up @@ -75,6 +77,11 @@ public void close() {
super.close();
}

@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
return getTransferPair(ref, allocator);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we drop the callback here?

}

@Override
public ArrowBuf[] getBuffers(boolean clear) {
ArrowBuf[] out;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.util.CallBack;
import org.apache.arrow.vector.util.TransferPair;

import io.netty.buffer.ArrowBuf;
Expand Down Expand Up @@ -106,6 +107,8 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> {

TransferPair getTransferPair(String ref, BufferAllocator allocator);

TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we just add a parameter to the existing method?
We should avoid duplicating those methods unless necessary.
Please add javadoc to interface/abstract methods


/**
* Returns a new {@link org.apache.arrow.vector.util.TransferPair transfer pair} that is used to transfer underlying
* buffers into the target vector.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.ArrowType.Null;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.util.CallBack;
import org.apache.arrow.vector.util.TransferPair;

import io.netty.buffer.ArrowBuf;
Expand Down Expand Up @@ -154,6 +155,11 @@ public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
return defaultPair;
}

@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
return defaultPair;
}

@Override
public TransferPair makeTransferPair(ValueVector target) {
return defaultPair;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,12 @@ public FieldVector getDataVector() {

@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
return new TransferImpl(ref, allocator);
return getTransferPair(ref, allocator, null);
}

@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

? that's different from the original behavior. What's the intent?

}

@Override
Expand All @@ -148,8 +153,8 @@ private class TransferImpl implements TransferPair {
ListVector to;
TransferPair pairs[] = new TransferPair[3];

public TransferImpl(String name, BufferAllocator allocator) {
this(new ListVector(name, allocator, null));
public TransferImpl(String name, BufferAllocator allocator, CallBack callBack) {
this(new ListVector(name, allocator, callBack));
}

public TransferImpl(ListVector to) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ public int getBufferSizeFor(final int valueCount) {

@Override
public TransferPair getTransferPair(BufferAllocator allocator) {
return getTransferPair(name, allocator, null);
}

@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
return new MapTransferPair(this, new MapVector(name, allocator, callBack), false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public FieldReader getReader() {

@Override
public TransferPair getTransferPair(BufferAllocator allocator) {
return new NullableMapTransferPair(this, new NullableMapVector(name, allocator, callBack), false);
return new NullableMapTransferPair(this, new NullableMapVector(name, allocator, null), false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could just call the one bellow with name as ref.

}

@Override
Expand All @@ -93,6 +93,11 @@ public TransferPair makeTransferPair(ValueVector to) {

@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
return new NullableMapTransferPair(this, new NullableMapVector(ref, allocator, null), false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could just call the other one with null param

}

@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
return new NullableMapTransferPair(this, new NullableMapVector(ref, allocator, callBack), false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.SchemaChangeCallBack;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.UnionVector;
Expand All @@ -45,7 +46,9 @@
import org.apache.arrow.vector.types.pojo.ArrowType.Union;
import org.apache.arrow.vector.types.pojo.ArrowType.Utf8;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.util.CallBack;
import org.apache.arrow.vector.util.Text;
import org.apache.arrow.vector.util.TransferPair;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -59,7 +62,38 @@ public class TestComplexWriter {

@Test
public void simpleNestedTypes() {
MapVector parent = new MapVector("parent", allocator, null);
MapVector parent = populateMapVector(null);
MapReader rootReader = new SingleMapReaderImpl(parent).reader("root");
for (int i = 0; i < COUNT; i++) {
rootReader.setPosition(i);
Assert.assertEquals(i, rootReader.reader("int").readInteger().intValue());
Assert.assertEquals(i, rootReader.reader("bigInt").readLong().longValue());
}

parent.close();
}

@Test
public void transferPairSchemaChange() {
SchemaChangeCallBack callBack1 = new SchemaChangeCallBack();
SchemaChangeCallBack callBack2 = new SchemaChangeCallBack();
MapVector parent = populateMapVector(callBack1);

TransferPair tp = parent.getTransferPair("newVector", allocator, callBack2);

ComplexWriter writer = new ComplexWriterImpl("newWriter", parent);
MapWriter rootWriter = writer.rootAsMap();
IntWriter intWriter = rootWriter.integer("newInt");
intWriter.writeInt(1);
writer.setValueCount(1);

assertTrue(callBack1.getSchemaChangedAndReset());
// The second vector should not have registered a schema change
assertFalse(callBack1.getSchemaChangedAndReset());
}

private MapVector populateMapVector(CallBack callBack) {
MapVector parent = new MapVector("parent", allocator, callBack);
ComplexWriter writer = new ComplexWriterImpl("root", parent);
MapWriter rootWriter = writer.rootAsMap();
IntWriter intWriter = rootWriter.integer("int");
Expand All @@ -71,14 +105,7 @@ public void simpleNestedTypes() {
bigIntWriter.writeBigInt(i);
}
writer.setValueCount(COUNT);
MapReader rootReader = new SingleMapReaderImpl(parent).reader("root");
for (int i = 0; i < COUNT; i++) {
rootReader.setPosition(i);
Assert.assertEquals(i, rootReader.reader("int").readInteger().intValue());
Assert.assertEquals(i, rootReader.reader("bigInt").readLong().longValue());
}

parent.close();
return parent;
}

@Test
Expand Down