Skip to content

Commit

Permalink
GH-44065: [Java] Implement C Data Interface for RunEndEncodedVector (#…
Browse files Browse the repository at this point in the history
…44241)

* GitHub Issue: #44065

Lead-authored-by: chenweiguo.vc <chenweiguo.vc@bytedance.com>
Co-authored-by: ViggoC <viggoc96@gmail.com>
Co-authored-by: Vibhatha Lakmal Abeykoon <vibhatha@users.noreply.github.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
  • Loading branch information
3 people authored Oct 17, 2024
1 parent 16ad979 commit b175463
Show file tree
Hide file tree
Showing 10 changed files with 354 additions and 38 deletions.
1 change: 0 additions & 1 deletion dev/archery/archery/integration/datagen.py
Original file line number Diff line number Diff line change
Expand Up @@ -1974,7 +1974,6 @@ def _temp_path():

generate_run_end_encoded_case()
.skip_tester('C#')
.skip_tester('Java')
.skip_tester('JS')
# TODO(https://github.com/apache/arrow-nanoarrow/issues/618)
.skip_tester('nanoarrow')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public List<ArrowBuf> visit(ArrowType.Union type) {

@Override
public List<ArrowBuf> visit(ArrowType.RunEndEncoded type) {
throw new UnsupportedOperationException("Importing buffers for type: " + type);
return List.of();
}

@Override
Expand Down
4 changes: 4 additions & 0 deletions java/c/src/main/java/org/apache/arrow/c/Format.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ static String asString(ArrowType arrowType) {
return "+vl";
case LargeListView:
return "+vL";
case RunEndEncoded:
return "+r";
case NONE:
throw new IllegalArgumentException("Arrow type ID is NONE");
default:
Expand Down Expand Up @@ -321,6 +323,8 @@ static ArrowType asType(String format, long flags)
return new ArrowType.ListView();
case "+vL":
return new ArrowType.LargeListView();
case "+r":
return new ArrowType.RunEndEncoded();
default:
String[] parts = format.split(":", 2);
if (parts.length == 2) {
Expand Down
17 changes: 17 additions & 0 deletions java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.ListViewVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.RunEndEncodedVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.complex.UnionVector;
import org.apache.arrow.vector.complex.impl.UnionMapWriter;
Expand Down Expand Up @@ -770,6 +771,22 @@ public void testStructVector() {
}
}

@Test
public void testRunEndEncodedVector() {
try (final RunEndEncodedVector vector = RunEndEncodedVector.empty("v", allocator)) {
setVector(vector, List.of(1, 3), List.of(1, 2));
assertTrue(roundtrip(vector, RunEndEncodedVector.class));
}
}

@Test
public void testEmptyRunEndEncodedVector() {
try (final RunEndEncodedVector vector = RunEndEncodedVector.empty("v", allocator)) {
setVector(vector, List.of(), List.of());
assertTrue(roundtrip(vector, RunEndEncodedVector.class));
}
}

@Test
public void testExtensionTypeVector() {
ExtensionTypeRegistry.register(new UuidType());
Expand Down
14 changes: 14 additions & 0 deletions java/c/src/test/python/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,20 @@ def recreate_batch():
return reader.read_next_batch()

self.round_trip_record_batch(recreate_batch)

def test_runendencoded_array(self):
# empty vector
self.round_trip_array(lambda: pa.RunEndEncodedArray.from_arrays([], [], pa.run_end_encoded(pa.int64(), pa.int64())))

# constant null vector
self.round_trip_array(lambda: pa.RunEndEncodedArray.from_arrays([10], [None]))
# constant int vector
self.round_trip_array(lambda: pa.RunEndEncodedArray.from_arrays([10], [10]))

# run end int vector
self.round_trip_array(lambda: pa.RunEndEncodedArray.from_arrays([3, 5, 10, 12, 19], [1, 2, 1, None, 3]))
# run end string vector
self.round_trip_array(lambda: pa.RunEndEncodedArray.from_arrays([3, 5, 10, 12, 19], ["1", "2", "1", None, "3"]))

if __name__ == '__main__':
unittest.main(verbosity=2)
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@
import org.apache.arrow.memory.util.hash.ArrowBufHasher;
import org.apache.arrow.vector.BaseIntVector;
import org.apache.arrow.vector.BaseValueVector;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BufferBacked;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.ZeroVector;
import org.apache.arrow.vector.compare.VectorVisitor;
Expand All @@ -50,6 +53,7 @@
* values vector of any type. There are no buffers associated with the parent vector.
*/
public class RunEndEncodedVector extends BaseValueVector implements FieldVector {

public static final FieldVector DEFAULT_VALUE_VECTOR = ZeroVector.INSTANCE;
public static final FieldVector DEFAULT_RUN_END_VECTOR = ZeroVector.INSTANCE;

Expand Down Expand Up @@ -203,6 +207,7 @@ public void clear() {
for (FieldVector v : getChildrenFromFields()) {
v.clear();
}
this.valueCount = 0;
}

/**
Expand Down Expand Up @@ -234,19 +239,6 @@ public MinorType getMinorType() {
return MinorType.RUNENDENCODED;
}

/**
* To transfer quota responsibility.
*
* @param allocator the target allocator
* @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new
* target vector of the same type.
*/
@Override
public TransferPair getTransferPair(BufferAllocator allocator) {
throw new UnsupportedOperationException(
"RunEndEncodedVector does not support getTransferPair(BufferAllocator)");
}

/**
* To transfer quota responsibility.
*
Expand Down Expand Up @@ -284,8 +276,7 @@ public TransferPair getTransferPair(Field field, BufferAllocator allocator) {
*/
@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
throw new UnsupportedOperationException(
"RunEndEncodedVector does not support getTransferPair(String, BufferAllocator, CallBack)");
return new TransferImpl(ref, allocator, callBack);
}

/**
Expand All @@ -299,8 +290,7 @@ public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallB
*/
@Override
public TransferPair getTransferPair(Field field, BufferAllocator allocator, CallBack callBack) {
throw new UnsupportedOperationException(
"RunEndEncodedVector does not support getTransferPair(Field, BufferAllocator, CallBack)");
return new TransferImpl(field, allocator, callBack);
}

/**
Expand All @@ -312,8 +302,156 @@ public TransferPair getTransferPair(Field field, BufferAllocator allocator, Call
*/
@Override
public TransferPair makeTransferPair(ValueVector target) {
throw new UnsupportedOperationException(
"RunEndEncodedVector does not support makeTransferPair(ValueVector)");
return new TransferImpl((RunEndEncodedVector) target);
}

private class TransferImpl implements TransferPair {

RunEndEncodedVector to;
TransferPair dataTransferPair;
TransferPair reeTransferPair;

public TransferImpl(String name, BufferAllocator allocator, CallBack callBack) {
this(new RunEndEncodedVector(name, allocator, field.getFieldType(), callBack));
}

public TransferImpl(Field field, BufferAllocator allocator, CallBack callBack) {
this(new RunEndEncodedVector(field, allocator, callBack));
}

public TransferImpl(RunEndEncodedVector to) {
this.to = to;
if (to.getRunEndsVector() instanceof ZeroVector) {
to.initializeChildrenFromFields(field.getChildren());
}
reeTransferPair = getRunEndsVector().makeTransferPair(to.getRunEndsVector());
dataTransferPair = getValuesVector().makeTransferPair(to.getValuesVector());
}

/**
* Transfer the vector data to another vector. The memory associated with this vector is
* transferred to the allocator of target vector for accounting and management purposes.
*/
@Override
public void transfer() {
to.clear();
dataTransferPair.transfer();
reeTransferPair.transfer();
if (valueCount > 0) {
to.setValueCount(valueCount);
}
clear();
}

/**
* Slice this vector at the desired index and length, then transfer the corresponding data to
* the target vector.
*
* @param startIndex start position of the split in source vector.
* @param length length of the split.
*/
@Override
public void splitAndTransfer(int startIndex, int length) {
to.clear();
if (length <= 0) {
return;
}

int physicalStartIndex = getPhysicalIndex(startIndex);
int physicalEndIndex = getPhysicalIndex(startIndex + length - 1);
int physicalLength = physicalEndIndex - physicalStartIndex + 1;
dataTransferPair.splitAndTransfer(physicalStartIndex, physicalLength);
FieldVector toRunEndsVector = to.runEndsVector;
if (startIndex == 0) {
if (((BaseIntVector) runEndsVector).getValueAsLong(physicalEndIndex) == length) {
reeTransferPair.splitAndTransfer(physicalStartIndex, physicalLength);
} else {
reeTransferPair.splitAndTransfer(physicalStartIndex, physicalLength - 1);
toRunEndsVector.setValueCount(physicalLength);
if (toRunEndsVector instanceof SmallIntVector) {
((SmallIntVector) toRunEndsVector).set(physicalEndIndex, length);
} else if (toRunEndsVector instanceof IntVector) {
((IntVector) toRunEndsVector).set(physicalEndIndex, length);
} else if (toRunEndsVector instanceof BigIntVector) {
((BigIntVector) toRunEndsVector).set(physicalEndIndex, length);
} else {
throw new IllegalArgumentException(
"Run-end vector and must be of type int with size 16, 32, or 64 bits.");
}
}
} else {
shiftRunEndsVector(
toRunEndsVector,
startIndex,
length,
physicalStartIndex,
physicalEndIndex,
physicalLength);
}
getTo().setValueCount(length);
}

private void shiftRunEndsVector(
ValueVector toRunEndVector,
int startIndex,
int length,
int physicalStartIndex,
int physicalEndIndex,
int physicalLength) {
toRunEndVector.setValueCount(physicalLength);
toRunEndVector.getValidityBuffer().setOne(0, toRunEndVector.getValidityBuffer().capacity());
ArrowBuf fromRunEndBuffer = runEndsVector.getDataBuffer();
ArrowBuf toRunEndBuffer = toRunEndVector.getDataBuffer();
int physicalLastIndex = physicalLength - 1;
if (toRunEndVector instanceof SmallIntVector) {
byte typeWidth = SmallIntVector.TYPE_WIDTH;
for (int i = 0; i < physicalLastIndex; i++) {
toRunEndBuffer.setShort(
(long) i * typeWidth,
fromRunEndBuffer.getShort((long) (i + physicalStartIndex) * typeWidth) - startIndex);
}
int lastEnd =
Math.min(
fromRunEndBuffer.getShort((long) physicalEndIndex * typeWidth) - startIndex,
length);
toRunEndBuffer.setShort((long) physicalLastIndex * typeWidth, lastEnd);
} else if (toRunEndVector instanceof IntVector) {
byte typeWidth = IntVector.TYPE_WIDTH;
for (int i = 0; i < physicalLastIndex; i++) {
toRunEndBuffer.setInt(
(long) i * typeWidth,
fromRunEndBuffer.getInt((long) (i + physicalStartIndex) * typeWidth) - startIndex);
}
int lastEnd =
Math.min(
fromRunEndBuffer.getInt((long) physicalEndIndex * typeWidth) - startIndex, length);
toRunEndBuffer.setInt((long) physicalLastIndex * typeWidth, lastEnd);
} else if (toRunEndVector instanceof BigIntVector) {
byte typeWidth = BigIntVector.TYPE_WIDTH;
for (int i = 0; i < physicalLastIndex; i++) {
toRunEndBuffer.setLong(
(long) i * typeWidth,
fromRunEndBuffer.getLong((long) (i + physicalStartIndex) * typeWidth) - startIndex);
}
long lastEnd =
Math.min(
fromRunEndBuffer.getLong((long) physicalEndIndex * typeWidth) - startIndex, length);
toRunEndBuffer.setLong((long) physicalLastIndex * typeWidth, lastEnd);
} else {
throw new IllegalArgumentException(
"Run-end vector and must be of type int with size 16, 32, or 64 bits.");
}
}

@Override
public ValueVector getTo() {
return to;
}

@Override
public void copyValueSafe(int from, int to) {
this.to.copyFrom(from, to, RunEndEncodedVector.this);
}
}

/**
Expand Down Expand Up @@ -568,6 +706,7 @@ public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers
throw new UnsupportedOperationException(
"Run-end encoded vectors do not have any associated buffers.");
}
this.valueCount = fieldNode.getLength();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -909,10 +909,12 @@ private void readFromJsonIntoVector(Field field, FieldVector vector) throws IOEx
variadicBufferIndices));
}

int nullCount = 0;
if (type instanceof ArrowType.Null) {
int nullCount;
if (type instanceof ArrowType.RunEndEncoded || type instanceof Union) {
nullCount = 0;
} else if (type instanceof ArrowType.Null) {
nullCount = valueCount;
} else if (!(type instanceof Union)) {
} else {
nullCount = BitVectorHelper.getNullCount(vectorBuffers.get(0), valueCount);
}
final ArrowFieldNode fieldNode = new ArrowFieldNode(valueCount, nullCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public Void visit(RunEndEncodedVector vector, Void value) {
if (runCount == 0) {
validateOrThrow(valueCount == 0, "Run end vector does not contain enough elements");
} else if (runCount > 0) {
double lastEnd = ((BaseIntVector) runEndsVector).getValueAsLong(runCount - 1);
long lastEnd = ((BaseIntVector) runEndsVector).getValueAsLong(runCount - 1);
validateOrThrow(
valueCount == lastEnd,
"Vector logic length not equal to the last end in run ends vector. Logical length %s, last end %s",
Expand Down
Loading

0 comments on commit b175463

Please sign in to comment.