Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-40038: [Java] Export non empty offset buffer for variable-size layout through C Data Interface #40043

Merged
merged 7 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions java/c/src/main/java/org/apache/arrow/c/ArrayExporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,7 @@ void export(ArrowArray array, FieldVector vector, DictionaryProvider dictionaryP
if (buffers != null) {
data.buffers = new ArrayList<>(buffers.size());
data.buffers_ptrs = allocator.buffer((long) buffers.size() * Long.BYTES);
for (ArrowBuf arrowBuf : buffers) {
if (arrowBuf != null) {
arrowBuf.getReferenceManager().retain();
data.buffers_ptrs.writeLong(arrowBuf.memoryAddress());
} else {
data.buffers_ptrs.writeLong(NULL);
}
data.buffers.add(arrowBuf);
}
vector.exportCDataBuffers(data.buffers, data.buffers_ptrs, NULL);
}

if (dictionaryEncoding != null) {
Expand Down
18 changes: 17 additions & 1 deletion java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.apache.arrow.memory.ArrowBuf;
Expand Down Expand Up @@ -164,10 +165,25 @@ VectorSchemaRoot vectorSchemaRootRoundtrip(VectorSchemaRoot root) {
}

boolean roundtrip(FieldVector vector, Class<?> clazz) {
List<ArrowBuf> fieldBuffers = vector.getFieldBuffers();
List<Integer> orgRefCnts = fieldBuffers.stream().map(buf -> buf.refCnt()).collect(Collectors.toList());
long orgMemorySize = allocator.getAllocatedMemory();

boolean result = false;
try (ValueVector imported = vectorRoundtrip(vector)) {
assertTrue(clazz.isInstance(imported), String.format("expected %s but was %s", clazz, imported.getClass()));
return VectorEqualsVisitor.vectorEquals(vector, imported);
result = VectorEqualsVisitor.vectorEquals(vector, imported);
}

// Check that the ref counts of the buffers are the same after the roundtrip
IntStream.range(0, orgRefCnts.size()).forEach(i -> {
ArrowBuf buf = fieldBuffers.get(i);
assertEquals(buf.refCnt(), orgRefCnts.get(i));
});

assertEquals(orgMemorySize, allocator.getAllocatedMemory());

return result;
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,34 @@ public List<ArrowBuf> getFieldBuffers() {
return result;
}

/**
* Export the buffers of the fields for C Data Interface. This method traverse the buffers and
* export buffer and buffer's memory address into a list of buffers and a pointer to the list of buffers.
*/
@Override
public void exportCDataBuffers(List<ArrowBuf> buffers, ArrowBuf buffersPtr, long nullValue) {
// before flight/IPC, we must bring the vector to a consistent state.
// this is because, it is possible that the offset buffers of some trailing values
// are not updated. this may cause some data in the data buffer being lost.
// for details, please see TestValueVector#testUnloadVariableWidthVector.
fillHoles(valueCount);

exportBuffer(validityBuffer, buffers, buffersPtr, nullValue, true);

if (offsetBuffer.capacity() == 0) {
// Empty offset buffer is allowed for historical reason.
// To export it through C Data interface, we need to allocate a buffer with one offset.
// We set `retain = false` to explicitly not increase the ref count for the exported buffer.
// The ref count of the newly created buffer (i.e., 1) already represents the usage
// at imported side.
exportBuffer(allocateOffsetBuffer(OFFSET_WIDTH), buffers, buffersPtr, nullValue, false);
lidavidm marked this conversation as resolved.
Show resolved Hide resolved
} else {
exportBuffer(offsetBuffer, buffers, buffersPtr, nullValue, true);
}

exportBuffer(valueBuffer, buffers, buffersPtr, nullValue, true);
}

/**
* Set the reader and writer indexes for the inner buffers.
*/
Expand Down Expand Up @@ -456,10 +484,11 @@ private void allocateBytes(final long valueBufferSize, final int valueCount) {
}

/* allocate offset buffer */
private void allocateOffsetBuffer(final long size) {
offsetBuffer = allocator.buffer(size);
private ArrowBuf allocateOffsetBuffer(final long size) {
ArrowBuf offsetBuffer = allocator.buffer(size);
offsetBuffer.readerIndex(0);
initOffsetBuffer();
return offsetBuffer;
}

/* allocate validity buffer */
Expand Down Expand Up @@ -760,7 +789,7 @@ private void splitAndTransferOffsetBuffer(int startIndex, int length, BaseLargeV
final long start = getStartOffset(startIndex);
final long end = getStartOffset(startIndex + length);
final long dataLength = end - start;
target.allocateOffsetBuffer((long) (length + 1) * OFFSET_WIDTH);
target.offsetBuffer = target.allocateOffsetBuffer((long) (length + 1) * OFFSET_WIDTH);
for (int i = 0; i < length + 1; i++) {
final long relativeSourceOffset = getStartOffset(startIndex + i) - start;
target.offsetBuffer.setLong((long) i * OFFSET_WIDTH, relativeSourceOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,34 @@ public List<ArrowBuf> getFieldBuffers() {
return result;
}

/**
* Export the buffers of the fields for C Data Interface. This method traverse the buffers and
* export buffer and buffer's memory address into a list of buffers and a pointer to the list of buffers.
*/
@Override
public void exportCDataBuffers(List<ArrowBuf> buffers, ArrowBuf buffersPtr, long nullValue) {
// before flight/IPC, we must bring the vector to a consistent state.
// this is because, it is possible that the offset buffers of some trailing values
// are not updated. this may cause some data in the data buffer being lost.
// for details, please see TestValueVector#testUnloadVariableWidthVector.
fillHoles(valueCount);

exportBuffer(validityBuffer, buffers, buffersPtr, nullValue, true);

if (offsetBuffer.capacity() == 0) {
// Empty offset buffer is allowed for historical reason.
// To export it through C Data interface, we need to allocate a buffer with one offset.
// We set `retain = false` to explicitly not increase the ref count for the exported buffer.
// The ref count of the newly created buffer (i.e., 1) already represents the usage
// at imported side.
exportBuffer(allocateOffsetBuffer(OFFSET_WIDTH), buffers, buffersPtr, nullValue, false);
} else {
exportBuffer(offsetBuffer, buffers, buffersPtr, nullValue, true);
}

exportBuffer(valueBuffer, buffers, buffersPtr, nullValue, true);
}

/**
* Set the reader and writer indexes for the inner buffers.
*/
Expand Down Expand Up @@ -476,11 +504,12 @@ private void allocateBytes(final long valueBufferSize, final int valueCount) {
}

/* allocate offset buffer */
private void allocateOffsetBuffer(final long size) {
private ArrowBuf allocateOffsetBuffer(final long size) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the intent of this change to avoid changing the existing offset buffer, and only change it for the exported version? Then I must ask

(1) why? If we're allocating the buffer, we may as well use it for the vector as well.
(2) is this safe in terms of leaking memory? since apparently nothing will hold on to the buffer after allocating it above

Copy link
Member Author

@viirya viirya Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 2, this is only for exporting the vectors through C Data Interface. By its means, it is intended to be released by the importer side. At Java Arrow, I remember ArrayExporter only increases the reference count for the buffers to be exported to prevent them to be released automatically.

And the release callback of exported structure will reduce the reference count to release it eventually. But it is triggered at imported side. I think this is what the C Data Interface defines.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 1, they are allocated for using only at imported side. I think as per discussed with @pitrou above, the idea is these empty buffers exist internally and it is okay.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Antoine's idea was that you could statically allocate one such buffer to use in such situations, if you're talking about this

As long as data stays internal to Arrow Java, the missing buffer is ok, but for purposes of exchanging data, a proper offsets buffers should be exported (this one can easily be statically allocated, by the way).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And yes, it seems ArrayExporter will increase the reference count by 1. But since it starts with a reference count of 1, that means these buffers will never be freed.

Copy link
Member Author

@viirya viirya Mar 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vibhatha I've added some more checks (buffer ref count, allocated memory) into the existing roundtrip tests (org.apache.arrow.c.RoundtripTest) of C data interface. The tests are passed with these additional checks. Please take a look. Thanks.

As this issue blocks some important feature in apache/datafusion-comet#95, we really like to move this forward. Thanks a lot. 🙏

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will take a look at this as soon as possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @vibhatha, sorry for pinging you. Do you have a chance to look at this recently? One important feature is blocked by the issue and we are looking forward to move this into Java Arrow library. Thank you. 🙏

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya I couldn't take a look. But let me see if I can allocate some time to do this probably earlier next week?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vibhatha It will be great! Thank you so much!

final int curSize = (int) size;
offsetBuffer = allocator.buffer(curSize);
ArrowBuf offsetBuffer = allocator.buffer(curSize);
offsetBuffer.readerIndex(0);
initOffsetBuffer();
return offsetBuffer;
}

/* allocate validity buffer */
Expand Down Expand Up @@ -805,7 +834,7 @@ private void splitAndTransferOffsetBuffer(int startIndex, int length, BaseVariab
(1 + length) * ((long) OFFSET_WIDTH));
target.offsetBuffer = transferBuffer(slicedOffsetBuffer, target.allocator);
} else {
target.allocateOffsetBuffer((long) (length + 1) * OFFSET_WIDTH);
target.offsetBuffer = target.allocateOffsetBuffer((long) (length + 1) * OFFSET_WIDTH);
for (int i = 0; i < length + 1; i++) {
final int relativeSourceOffset = getStartOffset(startIndex + i) - start;
target.offsetBuffer.setInt((long) i * OFFSET_WIDTH, relativeSourceOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,47 @@ public interface FieldVector extends ValueVector {
*/
List<ArrowBuf> getFieldBuffers();

/**
* Export a given buffer and its memory address into a list of buffers and a pointer to the list of buffers.
*
* @param buffer the buffer to export
* @param buffers the list of buffers
* @param buffersPtr the pointer to the list of buffers
* @param nullValue the null value to use for null buffer
* @param retain whether to retain the buffer when exporting
*/
default void exportBuffer(
ArrowBuf buffer,
List<ArrowBuf> buffers,
ArrowBuf buffersPtr,
long nullValue,
boolean retain) {
if (buffer != null) {
if (retain) {
buffer.getReferenceManager().retain();
}
buffersPtr.writeLong(buffer.memoryAddress());
} else {
buffersPtr.writeLong(nullValue);
}
buffers.add(buffer);
}

/**
* Export the buffers of the fields for C Data Interface. This method traverse the buffers and
* export buffer and buffer's memory address into a list of buffers and a pointer to the list of buffers.
*
* By default, when exporting a buffer, it will increase ref count for exported buffer that counts
* the usage at imported side.
*/
default void exportCDataBuffers(List<ArrowBuf> buffers, ArrowBuf buffersPtr, long nullValue) {
List<ArrowBuf> fieldBuffers = getFieldBuffers();

for (ArrowBuf arrowBuf : fieldBuffers) {
exportBuffer(arrowBuf, buffers, buffersPtr, nullValue, true);
}
}

/**
* Get the inner vectors.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public String getName() {
public boolean allocateNewSafe() {
boolean dataAlloc = false;
try {
allocateOffsetBuffer(offsetAllocationSizeInBytes);
offsetBuffer = allocateOffsetBuffer(offsetAllocationSizeInBytes);
dataAlloc = vector.allocateNewSafe();
} catch (Exception e) {
e.printStackTrace();
Expand All @@ -97,12 +97,13 @@ public boolean allocateNewSafe() {
return dataAlloc;
}

protected void allocateOffsetBuffer(final long size) {
protected ArrowBuf allocateOffsetBuffer(final long size) {
final int curSize = (int) size;
offsetBuffer = allocator.buffer(curSize);
ArrowBuf offsetBuffer = allocator.buffer(curSize);
offsetBuffer.readerIndex(0);
offsetAllocationSizeInBytes = curSize;
offsetBuffer.setZero(0, offsetBuffer.capacity());
return offsetBuffer;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,26 @@ public List<ArrowBuf> getFieldBuffers() {
return result;
}

/**
* Export the buffers of the fields for C Data Interface. This method traverse the buffers and
* export buffer and buffer's memory address into a list of buffers and a pointer to the list of buffers.
*/
@Override
public void exportCDataBuffers(List<ArrowBuf> buffers, ArrowBuf buffersPtr, long nullValue) {
exportBuffer(validityBuffer, buffers, buffersPtr, nullValue, true);

if (offsetBuffer.capacity() == 0) {
// Empty offset buffer is allowed for historical reason.
// To export it through C Data interface, we need to allocate a buffer with one offset.
// We set `retain = false` to explicitly not increase the ref count for the exported buffer.
// The ref count of the newly created buffer (i.e., 1) already represents the usage
// at imported side.
exportBuffer(allocateOffsetBuffer(OFFSET_WIDTH), buffers, buffersPtr, nullValue, false);
} else {
exportBuffer(offsetBuffer, buffers, buffersPtr, nullValue, true);
}
}

/**
* Set the reader and writer indexes for the inner buffers.
*/
Expand Down Expand Up @@ -343,7 +363,7 @@ public boolean allocateNewSafe() {
/* allocate offset and data buffer */
boolean dataAlloc = false;
try {
allocateOffsetBuffer(offsetAllocationSizeInBytes);
offsetBuffer = allocateOffsetBuffer(offsetAllocationSizeInBytes);
dataAlloc = vector.allocateNewSafe();
} catch (Exception e) {
e.printStackTrace();
Expand Down Expand Up @@ -371,11 +391,12 @@ private void allocateValidityBuffer(final long size) {
validityBuffer.setZero(0, validityBuffer.capacity());
}

protected void allocateOffsetBuffer(final long size) {
offsetBuffer = allocator.buffer(size);
protected ArrowBuf allocateOffsetBuffer(final long size) {
ArrowBuf offsetBuffer = allocator.buffer(size);
offsetBuffer.readerIndex(0);
offsetAllocationSizeInBytes = size;
offsetBuffer.setZero(0, offsetBuffer.capacity());
return offsetBuffer;
}

/**
Expand Down Expand Up @@ -656,7 +677,7 @@ public void splitAndTransfer(int startIndex, int length) {
final long startPoint = offsetBuffer.getLong((long) startIndex * OFFSET_WIDTH);
final long sliceLength = offsetBuffer.getLong((long) (startIndex + length) * OFFSET_WIDTH) - startPoint;
to.clear();
to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
to.offsetBuffer = to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
/* splitAndTransfer offset buffer */
for (int i = 0; i < length + 1; i++) {
final long relativeOffset = offsetBuffer.getLong((long) (startIndex + i) * OFFSET_WIDTH) - startPoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,26 @@ public List<ArrowBuf> getFieldBuffers() {
return result;
}

/**
* Export the buffers of the fields for C Data Interface. This method traverse the buffers and
* export buffer and buffer's memory address into a list of buffers and a pointer to the list of buffers.
*/
@Override
public void exportCDataBuffers(List<ArrowBuf> buffers, ArrowBuf buffersPtr, long nullValue) {
exportBuffer(validityBuffer, buffers, buffersPtr, nullValue, true);

if (offsetBuffer.capacity() == 0) {
// Empty offset buffer is allowed for historical reason.
// To export it through C Data interface, we need to allocate a buffer with one offset.
// We set `retain = false` to explicitly not increase the ref count for the exported buffer.
// The ref count of the newly created buffer (i.e., 1) already represents the usage
// at imported side.
exportBuffer(allocateOffsetBuffer(OFFSET_WIDTH), buffers, buffersPtr, nullValue, false);
} else {
exportBuffer(offsetBuffer, buffers, buffersPtr, nullValue, true);
}
}

/**
* Set the reader and writer indexes for the inner buffers.
*/
Expand Down Expand Up @@ -535,7 +555,7 @@ public void splitAndTransfer(int startIndex, int length) {
final int startPoint = offsetBuffer.getInt(startIndex * OFFSET_WIDTH);
final int sliceLength = offsetBuffer.getInt((startIndex + length) * OFFSET_WIDTH) - startPoint;
to.clear();
to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
to.offsetBuffer = to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
/* splitAndTransfer offset buffer */
for (int i = 0; i < length + 1; i++) {
final int relativeOffset = offsetBuffer.getInt((startIndex + i) * OFFSET_WIDTH) - startPoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public void splitAndTransfer(int startIndex, int length) {
final int startPoint = offsetBuffer.getInt(startIndex * OFFSET_WIDTH);
final int sliceLength = offsetBuffer.getInt((startIndex + length) * OFFSET_WIDTH) - startPoint;
to.clear();
to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
to.offsetBuffer = to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
/* splitAndTransfer offset buffer */
for (int i = 0; i < length + 1; i++) {
final int relativeOffset = offsetBuffer.getInt((startIndex + i) * OFFSET_WIDTH) - startPoint;
Expand Down
Loading