Skip to content

Commit

Permalink
chore: Address reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Sep 6, 2024
1 parent 6c2c182 commit 470e79f
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,6 @@ class NativeUtil {

(0 until numCols).foreach { index =>
val arrowSchema = ArrowSchema.allocateNew(allocator)

// Manually fill NULL to `release` slot of ArrowSchema because ArrowSchema doesn't provide
// `markReleased`.
// The total size of ArrowSchema is 72 bytes.
// The `release` slot is at offset 56 in the ArrowSchema struct.
val buffer =
MemoryUtil.directBuffer(arrowSchema.memoryAddress(), 72).order(ByteOrder.nativeOrder)
buffer.putLong(56, NULL);

val arrowArray = ArrowArray.allocateNew(allocator)
arrays(index) = arrowArray
schemas(index) = arrowSchema
Expand Down
12 changes: 7 additions & 5 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,21 +259,23 @@ fn parse_bool(conf: &HashMap<String, String>, name: &str) -> CometResult<bool> {
}

/// Prepares arrow arrays for output.
unsafe fn prepare_output(
fn prepare_output(
env: &mut JNIEnv,
array_addrs: jlongArray,
schema_addrs: jlongArray,
output_batch: RecordBatch,
exec_context: &mut ExecutionContext,
) -> CometResult<jlong> {
let array_address_array = JLongArray::from_raw(array_addrs);
let array_address_array = unsafe { JLongArray::from_raw(array_addrs) };
let num_cols = env.get_array_length(&array_address_array)? as usize;

let array_addrs = env.get_array_elements(&array_address_array, ReleaseMode::NoCopyBack)?;
let array_addrs =
unsafe { env.get_array_elements(&array_address_array, ReleaseMode::NoCopyBack)? };
let array_addrs = &*array_addrs;

let schema_address_array = JLongArray::from_raw(schema_addrs);
let schema_addrs = env.get_array_elements(&schema_address_array, ReleaseMode::NoCopyBack)?;
let schema_address_array = unsafe { JLongArray::from_raw(schema_addrs) };
let schema_addrs =
unsafe { env.get_array_elements(&schema_address_array, ReleaseMode::NoCopyBack)? };
let schema_addrs = &*schema_addrs;

let results = output_batch.columns();
Expand Down
18 changes: 11 additions & 7 deletions native/core/src/execution/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,20 @@ impl SparkArrowConvert for ArrayData {
let array_align = std::mem::align_of::<FFI_ArrowArray>();
let schema_align = std::mem::align_of::<FFI_ArrowSchema>();

// Check if the pointer alignment is correct for `replace`.
// Check if the pointer alignment is correct.
if array_ptr.align_offset(array_align) != 0 || schema_ptr.align_offset(schema_align) != 0 {
return Err(ExecutionError::ArrowError(
"Pointer alignment is not correct".to_string(),
));
unsafe {
std::ptr::write_unaligned(array_ptr, FFI_ArrowArray::new(self));
std::ptr::write_unaligned(schema_ptr, FFI_ArrowSchema::try_from(self.data_type())?);
}
} else {
// SAFETY: `array_ptr` and `schema_ptr` are aligned correctly.
unsafe {
std::ptr::write(array_ptr, FFI_ArrowArray::new(self));
std::ptr::write(schema_ptr, FFI_ArrowSchema::try_from(self.data_type())?);
}
}

unsafe { std::ptr::replace(array_ptr, FFI_ArrowArray::new(self)) };
unsafe { std::ptr::replace(schema_ptr, FFI_ArrowSchema::try_from(self.data_type())?) };

Ok(())
}
}
Expand Down

0 comments on commit 470e79f

Please sign in to comment.