Skip to content

Commit fe3b101

Browse files
authored
Allow type coersion of zero input arrays to nullary (#15487)
* ScalarUDF via FFI would break with nullary type inputs. We could not coerce empty vector of arguments to nullary due to check. This also exposes return_type_from_args instead of just return_type that was causing some UDFs to fail. Added unit tests and moved around FFI test modules a little. * Add license text * Correct error in documentation * Error message changed in test due to updated scalar coercion * Perform check of user defined types when looking for empty argument types * Updated error messages during unit test
1 parent 19a1e58 commit fe3b101

File tree

11 files changed

+458
-124
lines changed

11 files changed

+458
-124
lines changed

datafusion/expr/src/type_coercion/functions.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ pub fn data_types_with_scalar_udf(
4949
let signature = func.signature();
5050
let type_signature = &signature.type_signature;
5151

52-
if current_types.is_empty() {
52+
if current_types.is_empty() && type_signature != &TypeSignature::UserDefined {
5353
if type_signature.supports_zero_argument() {
5454
return Ok(vec![]);
5555
} else if type_signature.used_to_support_zero_arguments() {
@@ -87,7 +87,7 @@ pub fn data_types_with_aggregate_udf(
8787
let signature = func.signature();
8888
let type_signature = &signature.type_signature;
8989

90-
if current_types.is_empty() {
90+
if current_types.is_empty() && type_signature != &TypeSignature::UserDefined {
9191
if type_signature.supports_zero_argument() {
9292
return Ok(vec![]);
9393
} else if type_signature.used_to_support_zero_arguments() {
@@ -124,7 +124,7 @@ pub fn data_types_with_window_udf(
124124
let signature = func.signature();
125125
let type_signature = &signature.type_signature;
126126

127-
if current_types.is_empty() {
127+
if current_types.is_empty() && type_signature != &TypeSignature::UserDefined {
128128
if type_signature.supports_zero_argument() {
129129
return Ok(vec![]);
130130
} else if type_signature.used_to_support_zero_arguments() {
@@ -161,7 +161,7 @@ pub fn data_types(
161161
) -> Result<Vec<DataType>> {
162162
let type_signature = &signature.type_signature;
163163

164-
if current_types.is_empty() {
164+
if current_types.is_empty() && type_signature != &TypeSignature::UserDefined {
165165
if type_signature.supports_zero_argument() {
166166
return Ok(vec![]);
167167
} else if type_signature.used_to_support_zero_arguments() {

datafusion/ffi/src/tests/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,13 @@ use datafusion::{
3737
common::record_batch,
3838
};
3939
use sync_provider::create_sync_table_provider;
40-
use udf_udaf_udwf::create_ffi_abs_func;
40+
use udf_udaf_udwf::{create_ffi_abs_func, create_ffi_random_func};
4141

4242
mod async_provider;
4343
pub mod catalog;
4444
mod sync_provider;
4545
mod udf_udaf_udwf;
46+
pub mod utils;
4647

4748
#[repr(C)]
4849
#[derive(StableAbi)]
@@ -60,6 +61,8 @@ pub struct ForeignLibraryModule {
6061
/// Create a scalar UDF
6162
pub create_scalar_udf: extern "C" fn() -> FFI_ScalarUDF,
6263

64+
pub create_nullary_udf: extern "C" fn() -> FFI_ScalarUDF,
65+
6366
pub version: extern "C" fn() -> u64,
6467
}
6568

@@ -105,6 +108,7 @@ pub fn get_foreign_library_module() -> ForeignLibraryModuleRef {
105108
create_catalog: create_catalog_provider,
106109
create_table: construct_table_provider,
107110
create_scalar_udf: create_ffi_abs_func,
111+
create_nullary_udf: create_ffi_random_func,
108112
version: super::version,
109113
}
110114
.leak_into_prefix()

datafusion/ffi/src/tests/udf_udaf_udwf.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616
// under the License.
1717

1818
use crate::udf::FFI_ScalarUDF;
19-
use datafusion::{functions::math::abs::AbsFunc, logical_expr::ScalarUDF};
19+
use datafusion::{
20+
functions::math::{abs::AbsFunc, random::RandomFunc},
21+
logical_expr::ScalarUDF,
22+
};
2023

2124
use std::sync::Arc;
2225

@@ -25,3 +28,9 @@ pub(crate) extern "C" fn create_ffi_abs_func() -> FFI_ScalarUDF {
2528

2629
udf.into()
2730
}
31+
32+
pub(crate) extern "C" fn create_ffi_random_func() -> FFI_ScalarUDF {
33+
let udf: Arc<ScalarUDF> = Arc::new(RandomFunc::new().into());
34+
35+
udf.into()
36+
}

datafusion/ffi/src/tests/utils.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::tests::ForeignLibraryModuleRef;
19+
use abi_stable::library::RootModule;
20+
use datafusion::error::{DataFusionError, Result};
21+
use std::path::Path;
22+
23+
/// Compute the path to the library. It would be preferable to simply use
24+
/// abi_stable::library::development_utils::compute_library_path however
25+
/// our current CI pipeline has a `ci` profile that we need to use to
26+
/// find the library.
27+
pub fn compute_library_path<M: RootModule>(
28+
target_path: &Path,
29+
) -> std::io::Result<std::path::PathBuf> {
30+
let debug_dir = target_path.join("debug");
31+
let release_dir = target_path.join("release");
32+
let ci_dir = target_path.join("ci");
33+
34+
let debug_path = M::get_library_path(&debug_dir.join("deps"));
35+
let release_path = M::get_library_path(&release_dir.join("deps"));
36+
let ci_path = M::get_library_path(&ci_dir.join("deps"));
37+
38+
let all_paths = vec![
39+
(debug_dir.clone(), debug_path),
40+
(release_dir, release_path),
41+
(ci_dir, ci_path),
42+
];
43+
44+
let best_path = all_paths
45+
.into_iter()
46+
.filter(|(_, path)| path.exists())
47+
.filter_map(|(dir, path)| path.metadata().map(|m| (dir, m)).ok())
48+
.filter_map(|(dir, meta)| meta.modified().map(|m| (dir, m)).ok())
49+
.max_by_key(|(_, date)| *date)
50+
.map(|(dir, _)| dir)
51+
.unwrap_or(debug_dir);
52+
53+
Ok(best_path)
54+
}
55+
56+
pub fn get_module() -> Result<ForeignLibraryModuleRef> {
57+
let expected_version = crate::version();
58+
59+
let crate_root = Path::new(env!("CARGO_MANIFEST_DIR"));
60+
let target_dir = crate_root
61+
.parent()
62+
.expect("Failed to find crate parent")
63+
.parent()
64+
.expect("Failed to find workspace root")
65+
.join("target");
66+
67+
// Find the location of the library. This is specific to the build environment,
68+
// so you will need to change the approach here based on your use case.
69+
// let target: &std::path::Path = "../../../../target/".as_ref();
70+
let library_path =
71+
compute_library_path::<ForeignLibraryModuleRef>(target_dir.as_path())
72+
.map_err(|e| DataFusionError::External(Box::new(e)))?
73+
.join("deps");
74+
75+
// Load the module
76+
let module = ForeignLibraryModuleRef::load_from_directory(&library_path)
77+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
78+
79+
assert_eq!(
80+
module
81+
.version()
82+
.expect("Unable to call version on FFI module")(),
83+
expected_version
84+
);
85+
86+
Ok(module)
87+
}

datafusion/ffi/src/udf.rs renamed to datafusion/ffi/src/udf/mod.rs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,20 @@ use arrow::{
2929
};
3030
use datafusion::{
3131
error::DataFusionError,
32-
logical_expr::type_coercion::functions::data_types_with_scalar_udf,
32+
logical_expr::{
33+
type_coercion::functions::data_types_with_scalar_udf, ReturnInfo, ReturnTypeArgs,
34+
},
3335
};
3436
use datafusion::{
3537
error::Result,
3638
logical_expr::{
3739
ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
3840
},
3941
};
42+
use return_info::FFI_ReturnInfo;
43+
use return_type_args::{
44+
FFI_ReturnTypeArgs, ForeignReturnTypeArgs, ForeignReturnTypeArgsOwned,
45+
};
4046

4147
use crate::{
4248
arrow_wrappers::{WrappedArray, WrappedSchema},
@@ -45,6 +51,9 @@ use crate::{
4551
volatility::FFI_Volatility,
4652
};
4753

54+
pub mod return_info;
55+
pub mod return_type_args;
56+
4857
/// A stable struct for sharing a [`ScalarUDF`] across FFI boundaries.
4958
#[repr(C)]
5059
#[derive(Debug, StableAbi)]
@@ -66,6 +75,14 @@ pub struct FFI_ScalarUDF {
6675
arg_types: RVec<WrappedSchema>,
6776
) -> RResult<WrappedSchema, RString>,
6877

78+
/// Determines the return info of the underlying [`ScalarUDF`]. Either this
79+
/// or return_type may be implemented on a UDF.
80+
pub return_type_from_args: unsafe extern "C" fn(
81+
udf: &Self,
82+
args: FFI_ReturnTypeArgs,
83+
)
84+
-> RResult<FFI_ReturnInfo, RString>,
85+
6986
/// Execute the underlying [`ScalarUDF`] and return the result as a `FFI_ArrowArray`
7087
/// within an AbiStable wrapper.
7188
pub invoke_with_args: unsafe extern "C" fn(
@@ -123,6 +140,23 @@ unsafe extern "C" fn return_type_fn_wrapper(
123140
rresult!(return_type)
124141
}
125142

143+
unsafe extern "C" fn return_type_from_args_fn_wrapper(
144+
udf: &FFI_ScalarUDF,
145+
args: FFI_ReturnTypeArgs,
146+
) -> RResult<FFI_ReturnInfo, RString> {
147+
let private_data = udf.private_data as *const ScalarUDFPrivateData;
148+
let udf = &(*private_data).udf;
149+
150+
let args: ForeignReturnTypeArgsOwned = rresult_return!((&args).try_into());
151+
let args_ref: ForeignReturnTypeArgs = (&args).into();
152+
153+
let return_type = udf
154+
.return_type_from_args((&args_ref).into())
155+
.and_then(FFI_ReturnInfo::try_from);
156+
157+
rresult!(return_type)
158+
}
159+
126160
unsafe extern "C" fn coerce_types_fn_wrapper(
127161
udf: &FFI_ScalarUDF,
128162
arg_types: RVec<WrappedSchema>,
@@ -209,6 +243,7 @@ impl From<Arc<ScalarUDF>> for FFI_ScalarUDF {
209243
short_circuits,
210244
invoke_with_args: invoke_with_args_fn_wrapper,
211245
return_type: return_type_fn_wrapper,
246+
return_type_from_args: return_type_from_args_fn_wrapper,
212247
coerce_types: coerce_types_fn_wrapper,
213248
clone: clone_fn_wrapper,
214249
release: release_fn_wrapper,
@@ -281,6 +316,16 @@ impl ScalarUDFImpl for ForeignScalarUDF {
281316
result.and_then(|r| (&r.0).try_into().map_err(DataFusionError::from))
282317
}
283318

319+
fn return_type_from_args(&self, args: ReturnTypeArgs) -> Result<ReturnInfo> {
320+
let args: FFI_ReturnTypeArgs = args.try_into()?;
321+
322+
let result = unsafe { (self.udf.return_type_from_args)(&self.udf, args) };
323+
324+
let result = df_result!(result);
325+
326+
result.and_then(|r| r.try_into())
327+
}
328+
284329
fn invoke_with_args(&self, invoke_args: ScalarFunctionArgs) -> Result<ColumnarValue> {
285330
let ScalarFunctionArgs {
286331
args,
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use abi_stable::StableAbi;
19+
use arrow::{datatypes::DataType, ffi::FFI_ArrowSchema};
20+
use datafusion::{error::DataFusionError, logical_expr::ReturnInfo};
21+
22+
use crate::arrow_wrappers::WrappedSchema;
23+
24+
/// A stable struct for sharing a [`ReturnInfo`] across FFI boundaries.
25+
#[repr(C)]
26+
#[derive(Debug, StableAbi)]
27+
#[allow(non_camel_case_types)]
28+
pub struct FFI_ReturnInfo {
29+
return_type: WrappedSchema,
30+
nullable: bool,
31+
}
32+
33+
impl TryFrom<ReturnInfo> for FFI_ReturnInfo {
34+
type Error = DataFusionError;
35+
36+
fn try_from(value: ReturnInfo) -> Result<Self, Self::Error> {
37+
let return_type = WrappedSchema(FFI_ArrowSchema::try_from(value.return_type())?);
38+
Ok(Self {
39+
return_type,
40+
nullable: value.nullable(),
41+
})
42+
}
43+
}
44+
45+
impl TryFrom<FFI_ReturnInfo> for ReturnInfo {
46+
type Error = DataFusionError;
47+
48+
fn try_from(value: FFI_ReturnInfo) -> Result<Self, Self::Error> {
49+
let return_type = DataType::try_from(&value.return_type.0)?;
50+
51+
Ok(ReturnInfo::new(return_type, value.nullable))
52+
}
53+
}

0 commit comments

Comments
 (0)