Skip to content

Commit 2955a21

Browse files
authored
fix: zero Arrow Array offset before sending across FFI (#2052)
1 parent 8f8353b commit 2955a21

File tree

1 file changed

+43
-25
lines changed

1 file changed

+43
-25
lines changed

native/core/src/execution/jni_api.rs

Lines changed: 43 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,6 @@
1818
//! Define JNI APIs which can be called from Java/Scala.
1919
2020
use super::{serde, utils::SparkArrowConvert};
21-
use arrow::array::RecordBatch;
22-
use arrow::datatypes::DataType as ArrowDataType;
23-
use datafusion::execution::memory_pool::MemoryPool;
24-
use datafusion::{
25-
execution::{disk_manager::DiskManagerBuilder, runtime_env::RuntimeEnv},
26-
physical_plan::{display::DisplayableExecutionPlan, SendableRecordBatchStream},
27-
prelude::{SessionConfig, SessionContext},
28-
};
29-
use futures::poll;
30-
use jni::{
31-
errors::Result as JNIResult,
32-
objects::{
33-
JByteArray, JClass, JIntArray, JLongArray, JObject, JObjectArray, JPrimitiveArray, JString,
34-
ReleaseMode,
35-
},
36-
sys::{jbyteArray, jint, jlong, jlongArray},
37-
JNIEnv,
38-
};
39-
use std::path::PathBuf;
40-
use std::time::{Duration, Instant};
41-
use std::{sync::Arc, task::Poll};
42-
4321
use crate::{
4422
errors::{try_unwrap_or_throw, CometError, CometResult},
4523
execution::{
@@ -48,19 +26,41 @@ use crate::{
4826
},
4927
jvm_bridge::{jni_new_global_ref, JVMClasses},
5028
};
29+
use arrow::array::{Array, RecordBatch, UInt32Array};
30+
use arrow::compute::{take, TakeOptions};
31+
use arrow::datatypes::DataType as ArrowDataType;
5132
use datafusion::common::ScalarValue;
5233
use datafusion::execution::disk_manager::DiskManagerMode;
34+
use datafusion::execution::memory_pool::MemoryPool;
5335
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
5436
use datafusion::logical_expr::ScalarUDF;
37+
use datafusion::{
38+
execution::{disk_manager::DiskManagerBuilder, runtime_env::RuntimeEnv},
39+
physical_plan::{display::DisplayableExecutionPlan, SendableRecordBatchStream},
40+
prelude::{SessionConfig, SessionContext},
41+
};
5542
use datafusion_comet_proto::spark_operator::Operator;
5643
use datafusion_spark::function::math::expm1::SparkExpm1;
44+
use futures::poll;
5745
use futures::stream::StreamExt;
5846
use jni::objects::JByteBuffer;
5947
use jni::sys::JNI_FALSE;
48+
use jni::{
49+
errors::Result as JNIResult,
50+
objects::{
51+
JByteArray, JClass, JIntArray, JLongArray, JObject, JObjectArray, JPrimitiveArray, JString,
52+
ReleaseMode,
53+
},
54+
sys::{jbyteArray, jint, jlong, jlongArray},
55+
JNIEnv,
56+
};
6057
use jni::{
6158
objects::GlobalRef,
6259
sys::{jboolean, jdouble, jintArray, jobjectArray, jstring},
6360
};
61+
use std::path::PathBuf;
62+
use std::time::{Duration, Instant};
63+
use std::{sync::Arc, task::Poll};
6464
use tokio::runtime::Runtime;
6565

6666
use crate::execution::memory_pools::{
@@ -341,10 +341,28 @@ fn prepare_output(
341341
let mut i = 0;
342342
while i < results.len() {
343343
let array_ref = results.get(i).ok_or(CometError::IndexOutOfBounds(i))?;
344-
array_ref
345-
.to_data()
346-
.move_to_spark(array_addrs[i], schema_addrs[i])?;
347344

345+
if array_ref.offset() != 0 {
346+
// https://github.com/apache/datafusion-comet/issues/2051
347+
// Bug with non-zero offset FFI, so take to a new array which will have an offset of 0.
348+
// We expect this to be a cold code path, hence the check_bounds: true and assert_eq.
349+
let indices = UInt32Array::from((0..num_rows as u32).collect::<Vec<u32>>());
350+
let new_array = take(
351+
array_ref,
352+
&indices,
353+
Some(TakeOptions { check_bounds: true }),
354+
)?;
355+
356+
assert_eq!(new_array.offset(), 0);
357+
358+
new_array
359+
.to_data()
360+
.move_to_spark(array_addrs[i], schema_addrs[i])?;
361+
} else {
362+
array_ref
363+
.to_data()
364+
.move_to_spark(array_addrs[i], schema_addrs[i])?;
365+
}
348366
i += 1;
349367
}
350368
}

0 commit comments

Comments
 (0)