Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion arrow/src/array/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,9 @@ pub unsafe fn make_array_from_raw(
schema: *const ffi::FFI_ArrowSchema,
) -> Result<ArrayRef> {
let array = ffi::ArrowArray::try_from_raw(array, schema)?;
let data = ArrayData::try_from(array)?;
let data = ArrayData::try_from(&array)?;
// Avoid dropping the `Box` pointers and trigger the `release` mechanism.
let _ = ffi::ArrowArray::into_raw(array);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to change ArrayData::try_from() to have &? This line seems very tricky.

And as we change the ArrayData::try_from() API, all other users may also need to change the code and also add this tricky line.

Copy link
Member Author

@viirya viirya Mar 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since ArrayData::try_from() moves ffi::ArrowArray, it will drop the ArrowArray and trigger release for the structs (as they are just Box pointers now). Users cannot prevent it happened. For example, without this change, our internal usecase got a SIGSEGV.

So I change it to a borrowed reference to avoid dropping/releasing there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @viirya , understand the issue here.

I'm not sure if #1441 this can be the solution? Or even we can add a ArrowArray::try_from_box() to do the similar thing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so.

Arc::from(Box::from_raw(array as *mut FFI_ArrowArray)) is Arc<Box<FFI_ArrowArray>>. Then the raw pointer is *Box<FFI_ArrowArray>, but you treat it as *FFI_ArrowArray.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I'm wrong. I'm not aware that there is from(v: Box<T>) API in Arc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously you don't need it. As Arc is kept in the created Buffer of Array data, you can rely on deallocation of the Buffer to call release of such ffi structs.

But Box cannot give us such benefit. So it makes the management more explicit and relying on users. We need to keep these structs so release won't be called, before we don't need the Array data (Buffer).

The code is at ArrowArrayRef.to_data. It is to create an ArrayData from an ArrowArray(Ref). And you can follow buffers -> create_buffer -> Buffer::from_unowned.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for the explanation.

I'm wondering if we can still use Arc but drop the "envelop memory" allocated for the struct holding the actual pointers, for the input array and schema. For example:

    pub unsafe fn try_from_raw(
        array: *const FFI_ArrowArray,
        schema: *const FFI_ArrowSchema,
    ) -> Result<Self> {
        if array.is_null() || schema.is_null() {
            return Err(ArrowError::MemoryError(
                "At least one of the pointers passed to `try_from_raw` is null"
                    .to_string(),
            ));
        };

        let array_mut = array as *mut FFI_ArrowArray;
        let schema_mut = schema as *mut FFI_ArrowSchema;

        let array_data = std::ptr::replace(array_mut, FFI_ArrowArray::empty());
        let schema_data = std::ptr::replace(schema_mut, FFI_ArrowSchema::empty());

        std::ptr::drop_in_place(array_mut);
        std::ptr::drop_in_place(schema_mut);

        Ok(Self {
            array: Arc::new(array_data),
            schema: Arc::new(schema_data),
        })
    }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks close to previous suggestion Arc::from as it also copies bytes from source structs. But it causes SIGSEGV. I will try to test this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried it. It seems okay. Arc::from previously not work, I think, is because it calls allocator to deallocate the memory allocation. As it is allocated by Java in our case, we cannot let Rust to deallocate it.

std::ptr::drop_in_place seems only trigger dropping. As we make it as empty structs, it won't trigger release. I think this is close to #1436 which cleans up release field of source structs after cloning it. Here we in fact still clone it, but just internally and don't expose clone.

Looks good to me. Thanks @sunchao .

cc @alamb @wangfenjin WDYT? Are you agreed with this approach?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The suggested approach is at #1449.

Ok(make_array(data))
}
// Helper function for printing potentially long arrays.
Expand Down
6 changes: 3 additions & 3 deletions arrow/src/array/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ use crate::{

use super::ArrayData;

impl TryFrom<ffi::ArrowArray> for ArrayData {
impl TryFrom<&ffi::ArrowArray> for ArrayData {
type Error = ArrowError;

fn try_from(value: ffi::ArrowArray) -> Result<Self> {
fn try_from(value: &ffi::ArrowArray) -> Result<Self> {
value.to_data()
}
}
Expand Down Expand Up @@ -69,7 +69,7 @@ mod tests {
// simulate an external consumer by being the consumer
let d1 = unsafe { ArrowArray::try_from_raw(array, schema) }?;

let result = &ArrayData::try_from(d1)?;
let result = &ArrayData::try_from(&d1)?;

assert_eq!(result, expected);
Ok(())
Expand Down
10 changes: 2 additions & 8 deletions arrow/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk};
use crate::{
bytes::{Bytes, Deallocation},
datatypes::ArrowNativeType,
ffi,
};

use super::ops::bitwise_unary_op_helper;
Expand Down Expand Up @@ -86,18 +85,13 @@ impl Buffer {
///
/// * `ptr` - Pointer to raw parts
/// * `len` - Length of raw parts in **bytes**
/// * `data` - An [ffi::FFI_ArrowArray] with the data
///
/// # Safety
///
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
/// bytes and that the foreign deallocator frees the region.
pub unsafe fn from_unowned(
ptr: NonNull<u8>,
len: usize,
data: Arc<ffi::FFI_ArrowArray>,
) -> Self {
Buffer::build_with_arguments(ptr, len, Deallocation::Foreign(data))
pub unsafe fn from_unowned(ptr: NonNull<u8>, len: usize) -> Self {
Buffer::build_with_arguments(ptr, len, Deallocation::Foreign)
}

/// Auxiliary method to create a new Buffer
Expand Down
11 changes: 5 additions & 6 deletions arrow/src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@

use core::slice;
use std::ptr::NonNull;
use std::sync::Arc;
use std::{fmt::Debug, fmt::Formatter};

use crate::{alloc, ffi};
use crate::alloc;

/// Mode of deallocating memory regions
pub enum Deallocation {
/// Native deallocation, using Rust deallocator with Arrow-specific memory alignment
Native(usize),
/// Foreign interface, via a callback
Foreign(Arc<ffi::FFI_ArrowArray>),
Foreign,
}

impl Debug for Deallocation {
Expand All @@ -40,7 +39,7 @@ impl Debug for Deallocation {
Deallocation::Native(capacity) => {
write!(f, "Deallocation::Native {{ capacity: {} }}", capacity)
}
Deallocation::Foreign(_) => {
Deallocation::Foreign => {
write!(f, "Deallocation::Foreign {{ capacity: unknown }}")
}
}
Expand Down Expand Up @@ -116,7 +115,7 @@ impl Bytes {
Deallocation::Native(capacity) => capacity,
// we cannot determine this in general,
// and thus we state that this is externally-owned memory
Deallocation::Foreign(_) => 0,
Deallocation::Foreign => 0,
}
}
}
Expand All @@ -129,7 +128,7 @@ impl Drop for Bytes {
unsafe { alloc::free_aligned::<u8>(self.ptr, *capacity) };
}
// foreign interface knows how to deallocate itself.
Deallocation::Foreign(_) => (),
Deallocation::Foreign => (),
}
}
}
Expand Down
88 changes: 42 additions & 46 deletions arrow/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ bitflags! {
/// See <https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions>
/// This was created by bindgen
#[repr(C)]
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct FFI_ArrowSchema {
format: *const c_char,
name: *const c_char,
Expand Down Expand Up @@ -336,7 +336,7 @@ fn bit_width(data_type: &DataType, i: usize) -> Result<usize> {
/// See <https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions>
/// This was created by bindgen
#[repr(C)]
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct FFI_ArrowArray {
pub(crate) length: i64,
pub(crate) null_count: i64,
Expand Down Expand Up @@ -496,7 +496,6 @@ impl FFI_ArrowArray {
/// # Safety
/// This function assumes that `ceil(self.length * bits, 8)` is the size of the buffer
unsafe fn create_buffer(
owner: Arc<FFI_ArrowArray>,
array: &FFI_ArrowArray,
index: usize,
len: usize,
Expand All @@ -509,15 +508,15 @@ unsafe fn create_buffer(
assert!(index < array.n_buffers as usize);
let ptr = *buffers.add(index);

NonNull::new(ptr as *mut u8).map(|ptr| Buffer::from_unowned(ptr, len, owner))
NonNull::new(ptr as *mut u8).map(|ptr| Buffer::from_unowned(ptr, len))
}

fn create_child(
owner: Arc<FFI_ArrowArray>,
array: &FFI_ArrowArray,
schema: &FFI_ArrowSchema,
fn create_child<'a>(
owner: &'a FFI_ArrowArray,
array: &'a FFI_ArrowArray,
schema: &'a FFI_ArrowSchema,
index: usize,
) -> ArrowArrayChild<'static> {
) -> ArrowArrayChild<'a> {
assert!(index < array.n_children as usize);
assert!(!array.children.is_null());
assert!(!array.children.is_null());
Expand Down Expand Up @@ -579,13 +578,12 @@ pub trait ArrowArrayRef {

let len = self.buffer_len(index)?;

unsafe { create_buffer(self.owner().clone(), self.array(), index, len) }
.ok_or_else(|| {
ArrowError::CDataInterface(format!(
"The external buffer at position {} is null.",
index - 1
))
})
unsafe { create_buffer(self.array(), index, len) }.ok_or_else(|| {
ArrowError::CDataInterface(format!(
"The external buffer at position {} is null.",
index - 1
))
})
})
.collect()
}
Expand Down Expand Up @@ -656,14 +654,14 @@ pub trait ArrowArrayRef {
// similar to `self.buffer_len(0)`, but without `Result`.
let buffer_len = bit_util::ceil(self.array().length as usize, 8);

unsafe { create_buffer(self.owner().clone(), self.array(), 0, buffer_len) }
unsafe { create_buffer(self.array(), 0, buffer_len) }
}

fn child(&self, index: usize) -> ArrowArrayChild {
create_child(self.owner().clone(), self.array(), self.schema(), index)
create_child(&self.owner(), self.array(), self.schema(), index)
}

fn owner(&self) -> &Arc<FFI_ArrowArray>;
fn owner(&self) -> Arc<&FFI_ArrowArray>;
fn array(&self) -> &FFI_ArrowArray;
fn schema(&self) -> &FFI_ArrowSchema;
fn data_type(&self) -> Result<DataType>;
Expand All @@ -675,7 +673,7 @@ pub trait ArrowArrayRef {
Some(ArrowArrayChild::from_raw(
&*self.array().dictionary,
&*self.schema().dictionary,
self.owner().clone(),
&self.owner(),
))
} else {
None
Expand Down Expand Up @@ -706,15 +704,15 @@ pub trait ArrowArrayRef {
/// Furthermore, this struct assumes that the incoming data agrees with the C data interface.
#[derive(Debug)]
pub struct ArrowArray {
array: Arc<FFI_ArrowArray>,
schema: Arc<FFI_ArrowSchema>,
array: Box<FFI_ArrowArray>,
schema: Box<FFI_ArrowSchema>,
}

#[derive(Debug)]
pub struct ArrowArrayChild<'a> {
array: &'a FFI_ArrowArray,
schema: &'a FFI_ArrowSchema,
owner: Arc<FFI_ArrowArray>,
owner: &'a FFI_ArrowArray,
}

impl ArrowArrayRef for ArrowArray {
Expand All @@ -731,8 +729,8 @@ impl ArrowArrayRef for ArrowArray {
self.schema.as_ref()
}

fn owner(&self) -> &Arc<FFI_ArrowArray> {
&self.array
fn owner(&self) -> Arc<&FFI_ArrowArray> {
Arc::new(self.array.as_ref())
}
}

Expand All @@ -750,8 +748,8 @@ impl<'a> ArrowArrayRef for ArrowArrayChild<'a> {
self.schema
}

fn owner(&self) -> &Arc<FFI_ArrowArray> {
&self.owner
fn owner(&self) -> Arc<&FFI_ArrowArray> {
Arc::new(self.owner)
}
}

Expand All @@ -761,8 +759,8 @@ impl ArrowArray {
/// See safety of [ArrowArray]
#[allow(clippy::too_many_arguments)]
pub unsafe fn try_new(data: ArrayData) -> Result<Self> {
let array = Arc::new(FFI_ArrowArray::new(&data));
let schema = Arc::new(FFI_ArrowSchema::try_from(data.data_type())?);
let array = Box::new(FFI_ArrowArray::new(&data));
let schema = Box::new(FFI_ArrowSchema::try_from(data.data_type())?);
Ok(ArrowArray { array, schema })
}

Expand All @@ -781,34 +779,32 @@ impl ArrowArray {
.to_string(),
));
};
let ffi_array = (*array).clone();
let ffi_schema = (*schema).clone();
Ok(Self {
array: Arc::new(ffi_array),
schema: Arc::new(ffi_schema),
array: Box::from_raw(array as *mut FFI_ArrowArray),
schema: Box::from_raw(schema as *mut FFI_ArrowSchema),
})
}

/// creates a new empty [ArrowArray]. Used to import from the C Data Interface.
/// # Safety
/// See safety of [ArrowArray]
pub unsafe fn empty() -> Self {
let schema = Arc::new(FFI_ArrowSchema::empty());
let array = Arc::new(FFI_ArrowArray::empty());
let schema = Box::new(FFI_ArrowSchema::empty());
let array = Box::new(FFI_ArrowArray::empty());
ArrowArray { array, schema }
}

/// exports [ArrowArray] to the C Data Interface
pub fn into_raw(this: ArrowArray) -> (*const FFI_ArrowArray, *const FFI_ArrowSchema) {
(Arc::into_raw(this.array), Arc::into_raw(this.schema))
(Box::into_raw(this.array), Box::into_raw(this.schema))
}
}

impl<'a> ArrowArrayChild<'a> {
fn from_raw(
array: &'a FFI_ArrowArray,
schema: &'a FFI_ArrowSchema,
owner: Arc<FFI_ArrowArray>,
owner: &'a FFI_ArrowArray,
) -> Self {
Self {
array,
Expand Down Expand Up @@ -840,7 +836,7 @@ mod tests {
let array = ArrowArray::try_from(array.data().clone())?;

// (simulate consumer) import it
let data = ArrayData::try_from(array)?;
let data = ArrayData::try_from(&array)?;
let array = make_array(data);

// perform some operation
Expand All @@ -867,7 +863,7 @@ mod tests {
let array = ArrowArray::try_from(original_array.data().clone())?;

// (simulate consumer) import it
let data = ArrayData::try_from(array)?;
let data = ArrayData::try_from(&array)?;
let array = make_array(data);

// perform some operation
Expand All @@ -890,7 +886,7 @@ mod tests {
let array = ArrowArray::try_from(array.data().clone())?;

// (simulate consumer) import it
let data = ArrayData::try_from(array)?;
let data = ArrayData::try_from(&array)?;
let array = make_array(data);

// perform some operation
Expand Down Expand Up @@ -962,7 +958,7 @@ mod tests {
let array = ArrowArray::try_from(array.data().clone())?;

// (simulate consumer) import it
let data = ArrayData::try_from(array)?;
let data = ArrayData::try_from(&array)?;
let array = make_array(data);

// downcast
Expand Down Expand Up @@ -1002,7 +998,7 @@ mod tests {
let array = ArrowArray::try_from(array.data().clone())?;

// (simulate consumer) import it
let data = ArrayData::try_from(array)?;
let data = ArrayData::try_from(&array)?;
let array = make_array(data);

// perform some operation
Expand Down Expand Up @@ -1047,7 +1043,7 @@ mod tests {
let array = ArrowArray::try_from(array.data().clone())?;

// (simulate consumer) import it
let data = ArrayData::try_from(array)?;
let data = ArrayData::try_from(&array)?;
let array = make_array(data);

// perform some operation
Expand All @@ -1073,7 +1069,7 @@ mod tests {
let array = ArrowArray::try_from(array.data().clone())?;

// (simulate consumer) import it
let data = ArrayData::try_from(array)?;
let data = ArrayData::try_from(&array)?;
let array = make_array(data);

// perform some operation
Expand Down Expand Up @@ -1109,7 +1105,7 @@ mod tests {
let array = ArrowArray::try_from(array.data().clone())?;

// (simulate consumer) import it
let data = ArrayData::try_from(array)?;
let data = ArrayData::try_from(&array)?;
let array = make_array(data);

// perform some operation
Expand Down Expand Up @@ -1146,7 +1142,7 @@ mod tests {
let array = ArrowArray::try_from(dict_array.data().clone())?;

// (simulate consumer) import it
let data = ArrayData::try_from(array)?;
let data = ArrayData::try_from(&array)?;
let array = make_array(data);

// perform some operation
Expand Down
2 changes: 1 addition & 1 deletion arrow/src/pyarrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl PyArrowConvert for ArrayData {

let ffi_array =
unsafe { ffi::ArrowArray::try_from_raw(array_pointer, schema_pointer)? };
let data = ArrayData::try_from(ffi_array)?;
let data = ArrayData::try_from(&ffi_array)?;

Ok(data)
}
Expand Down