Skip to content

Commit 3dfce7d

Browse files
authored
bug: Fix NULL handling in array_slice, introduce NullHandling enum to Signature (#14289)
* bug: Fix NULL handling in array_slice This commit fixes the array_slice function so that if any arguments are NULL, the result is NULL. Previously, array_slice would return an internal error if any of the arguments were NULL. This behavior matches the behavior of DuckDB for array_slice. Fixes #10548 * Fix optimizer error * Fix clippy * PoC for strict functions * Generate all null values * Add NullHandling enum * Update array_slice signature * Switch to trait method * fmt * Fix typo * Handle batch inputs * Fix comment * Update array_pop methods null handling
1 parent 76da8e1 commit 3dfce7d

File tree

7 files changed

+172
-72
lines changed

7 files changed

+172
-72
lines changed

datafusion/expr-common/src/signature.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
//! and return types of functions in DataFusion.
2020
2121
use std::fmt::Display;
22+
use std::num::NonZeroUsize;
2223

2324
use crate::type_coercion::aggregates::NUMERICS;
2425
use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
@@ -236,9 +237,9 @@ pub enum ArrayFunctionSignature {
236237
/// The first argument should be non-list or list, and the second argument should be List/LargeList.
237238
/// The first argument's list dimension should be one dimension less than the second argument's list dimension.
238239
ElementAndArray,
239-
/// Specialized Signature for Array functions of the form (List/LargeList, Index)
240-
/// The first argument should be List/LargeList/FixedSizedList, and the second argument should be Int64.
241-
ArrayAndIndex,
240+
/// Specialized Signature for Array functions of the form (List/LargeList, Index+)
241+
/// The first argument should be List/LargeList/FixedSizedList, and the next n arguments should be Int64.
242+
ArrayAndIndexes(NonZeroUsize),
242243
/// Specialized Signature for Array functions of the form (List/LargeList, Element, Optional Index)
243244
ArrayAndElementAndOptionalIndex,
244245
/// Specialized Signature for ArrayEmpty and similar functions
@@ -265,8 +266,12 @@ impl Display for ArrayFunctionSignature {
265266
ArrayFunctionSignature::ElementAndArray => {
266267
write!(f, "element, array")
267268
}
268-
ArrayFunctionSignature::ArrayAndIndex => {
269-
write!(f, "array, index")
269+
ArrayFunctionSignature::ArrayAndIndexes(count) => {
270+
write!(f, "array")?;
271+
for _ in 0..count.get() {
272+
write!(f, ", index")?;
273+
}
274+
Ok(())
270275
}
271276
ArrayFunctionSignature::Array => {
272277
write!(f, "array")
@@ -600,9 +605,13 @@ impl Signature {
600605
}
601606
/// Specialized Signature for ArrayElement and similar functions
602607
pub fn array_and_index(volatility: Volatility) -> Self {
608+
Self::array_and_indexes(volatility, NonZeroUsize::new(1).expect("1 is non-zero"))
609+
}
610+
/// Specialized Signature for ArraySlice and similar functions
611+
pub fn array_and_indexes(volatility: Volatility, count: NonZeroUsize) -> Self {
603612
Signature {
604613
type_signature: TypeSignature::ArraySignature(
605-
ArrayFunctionSignature::ArrayAndIndex,
614+
ArrayFunctionSignature::ArrayAndIndexes(count),
606615
),
607616
volatility,
608617
}

datafusion/expr/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ pub use udaf::{
9595
SetMonotonicity, StatisticsArgs,
9696
};
9797
pub use udf::{
98-
scalar_doc_sections, ReturnInfo, ReturnTypeArgs, ScalarFunctionArgs, ScalarUDF,
99-
ScalarUDFImpl,
98+
scalar_doc_sections, NullHandling, ReturnInfo, ReturnTypeArgs, ScalarFunctionArgs,
99+
ScalarUDF, ScalarUDFImpl,
100100
};
101101
pub use udwf::{window_doc_sections, ReversedUDWF, WindowUDF, WindowUDFImpl};
102102
pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};

datafusion/expr/src/type_coercion/functions.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -671,13 +671,20 @@ fn get_valid_types(
671671
ArrayFunctionSignature::ElementAndArray => {
672672
array_append_or_prepend_valid_types(current_types, false)?
673673
}
674-
ArrayFunctionSignature::ArrayAndIndex => {
675-
if current_types.len() != 2 {
674+
ArrayFunctionSignature::ArrayAndIndexes(count) => {
675+
if current_types.len() != count.get() + 1 {
676676
return Ok(vec![vec![]]);
677677
}
678678
array(&current_types[0]).map_or_else(
679679
|| vec![vec![]],
680-
|array_type| vec![vec![array_type, DataType::Int64]],
680+
|array_type| {
681+
let mut inner = Vec::with_capacity(count.get() + 1);
682+
inner.push(array_type);
683+
for _ in 0..count.get() {
684+
inner.push(DataType::Int64);
685+
}
686+
vec![inner]
687+
},
681688
)
682689
}
683690
ArrayFunctionSignature::ArrayAndElementAndOptionalIndex => {

datafusion/expr/src/udf.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,11 @@ impl ScalarUDF {
200200
self.inner.return_type_from_args(args)
201201
}
202202

203+
/// Returns the behavior that this function has when any of the inputs are Null.
204+
pub fn null_handling(&self) -> NullHandling {
205+
self.inner.null_handling()
206+
}
207+
203208
/// Do the function rewrite
204209
///
205210
/// See [`ScalarUDFImpl::simplify`] for more details.
@@ -417,6 +422,15 @@ impl ReturnInfo {
417422
}
418423
}
419424

425+
/// A function's behavior when the input is Null.
426+
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash)]
427+
pub enum NullHandling {
428+
/// Null inputs are passed into the function implementation.
429+
PassThrough,
430+
/// Any Null input causes the function to return Null.
431+
Propagate,
432+
}
433+
420434
/// Trait for implementing user defined scalar functions.
421435
///
422436
/// This trait exposes the full API for implementing user defined functions and
@@ -589,6 +603,11 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
589603
true
590604
}
591605

606+
/// Returns the behavior that this function has when any of the inputs are Null.
607+
fn null_handling(&self) -> NullHandling {
608+
NullHandling::PassThrough
609+
}
610+
592611
/// Invoke the function on `args`, returning the appropriate result
593612
///
594613
/// Note: This method is deprecated and will be removed in future releases.

datafusion/functions-nested/src/extract.rs

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use arrow::array::MutableArrayData;
2727
use arrow::array::OffsetSizeTrait;
2828
use arrow::buffer::OffsetBuffer;
2929
use arrow::datatypes::DataType;
30+
use arrow_buffer::NullBufferBuilder;
3031
use arrow_schema::DataType::{FixedSizeList, LargeList, List};
3132
use arrow_schema::Field;
3233
use datafusion_common::cast::as_int64_array;
@@ -35,12 +36,13 @@ use datafusion_common::cast::as_list_array;
3536
use datafusion_common::{
3637
exec_err, internal_datafusion_err, plan_err, DataFusionError, Result,
3738
};
38-
use datafusion_expr::Expr;
39+
use datafusion_expr::{ArrayFunctionSignature, Expr, TypeSignature};
3940
use datafusion_expr::{
40-
ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
41+
ColumnarValue, Documentation, NullHandling, ScalarUDFImpl, Signature, Volatility,
4142
};
4243
use datafusion_macros::user_doc;
4344
use std::any::Any;
45+
use std::num::NonZeroUsize;
4446
use std::sync::Arc;
4547

4648
use crate::utils::make_scalar_function;
@@ -330,7 +332,26 @@ pub(super) struct ArraySlice {
330332
impl ArraySlice {
331333
pub fn new() -> Self {
332334
Self {
333-
signature: Signature::variadic_any(Volatility::Immutable),
335+
signature: Signature::one_of(
336+
vec![
337+
TypeSignature::ArraySignature(
338+
ArrayFunctionSignature::ArrayAndIndexes(
339+
NonZeroUsize::new(1).expect("1 is non-zero"),
340+
),
341+
),
342+
TypeSignature::ArraySignature(
343+
ArrayFunctionSignature::ArrayAndIndexes(
344+
NonZeroUsize::new(2).expect("2 is non-zero"),
345+
),
346+
),
347+
TypeSignature::ArraySignature(
348+
ArrayFunctionSignature::ArrayAndIndexes(
349+
NonZeroUsize::new(3).expect("3 is non-zero"),
350+
),
351+
),
352+
],
353+
Volatility::Immutable,
354+
),
334355
aliases: vec![String::from("list_slice")],
335356
}
336357
}
@@ -374,6 +395,10 @@ impl ScalarUDFImpl for ArraySlice {
374395
Ok(arg_types[0].clone())
375396
}
376397

398+
fn null_handling(&self) -> NullHandling {
399+
NullHandling::Propagate
400+
}
401+
377402
fn invoke_batch(
378403
&self,
379404
args: &[ColumnarValue],
@@ -430,8 +455,6 @@ fn array_slice_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
430455
}
431456
LargeList(_) => {
432457
let array = as_large_list_array(&args[0])?;
433-
let from_array = as_int64_array(&args[1])?;
434-
let to_array = as_int64_array(&args[2])?;
435458
general_array_slice::<i64>(array, from_array, to_array, stride)
436459
}
437460
_ => exec_err!("array_slice does not support type: {:?}", array_data_type),
@@ -451,9 +474,8 @@ where
451474
let original_data = values.to_data();
452475
let capacity = Capacities::Array(original_data.len());
453476

454-
// use_nulls: false, we don't need nulls but empty array for array_slice, so we don't need explicit nulls but adjust offset to indicate nulls.
455477
let mut mutable =
456-
MutableArrayData::with_capacities(vec![&original_data], false, capacity);
478+
MutableArrayData::with_capacities(vec![&original_data], true, capacity);
457479

458480
// We have the slice syntax compatible with DuckDB v0.8.1.
459481
// The rule `adjusted_from_index` and `adjusted_to_index` follows the rule of array_slice in duckdb.
@@ -516,30 +538,33 @@ where
516538
}
517539

518540
let mut offsets = vec![O::usize_as(0)];
541+
let mut null_builder = NullBufferBuilder::new(array.len());
519542

520543
for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
521544
let start = offset_window[0];
522545
let end = offset_window[1];
523546
let len = end - start;
524547

525-
// len 0 indicate array is null, return empty array in this row.
548+
// If any input is null, return null.
549+
if array.is_null(row_index)
550+
|| from_array.is_null(row_index)
551+
|| to_array.is_null(row_index)
552+
{
553+
mutable.extend_nulls(1);
554+
offsets.push(offsets[row_index] + O::usize_as(1));
555+
null_builder.append_null();
556+
continue;
557+
}
558+
null_builder.append_non_null();
559+
560+
// Empty arrays always return an empty array.
526561
if len == O::usize_as(0) {
527562
offsets.push(offsets[row_index]);
528563
continue;
529564
}
530565

531-
// If index is null, we consider it as the minimum / maximum index of the array.
532-
let from_index = if from_array.is_null(row_index) {
533-
Some(O::usize_as(0))
534-
} else {
535-
adjusted_from_index::<O>(from_array.value(row_index), len)?
536-
};
537-
538-
let to_index = if to_array.is_null(row_index) {
539-
Some(len - O::usize_as(1))
540-
} else {
541-
adjusted_to_index::<O>(to_array.value(row_index), len)?
542-
};
566+
let from_index = adjusted_from_index::<O>(from_array.value(row_index), len)?;
567+
let to_index = adjusted_to_index::<O>(to_array.value(row_index), len)?;
543568

544569
if let (Some(from), Some(to)) = (from_index, to_index) {
545570
let stride = stride.map(|s| s.value(row_index));
@@ -613,7 +638,7 @@ where
613638
Arc::new(Field::new_list_field(array.value_type(), true)),
614639
OffsetBuffer::<O>::new(offsets.into()),
615640
arrow_array::make_array(data),
616-
None,
641+
null_builder.finish(),
617642
)?))
618643
}
619644

@@ -665,6 +690,10 @@ impl ScalarUDFImpl for ArrayPopFront {
665690
Ok(arg_types[0].clone())
666691
}
667692

693+
fn null_handling(&self) -> NullHandling {
694+
NullHandling::Propagate
695+
}
696+
668697
fn invoke_batch(
669698
&self,
670699
args: &[ColumnarValue],
@@ -765,6 +794,10 @@ impl ScalarUDFImpl for ArrayPopBack {
765794
Ok(arg_types[0].clone())
766795
}
767796

797+
fn null_handling(&self) -> NullHandling {
798+
NullHandling::Propagate
799+
}
800+
768801
fn invoke_batch(
769802
&self,
770803
args: &[ColumnarValue],

datafusion/physical-expr/src/scalar_function.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ use datafusion_expr::interval_arithmetic::Interval;
4545
use datafusion_expr::sort_properties::ExprProperties;
4646
use datafusion_expr::type_coercion::functions::data_types_with_scalar_udf;
4747
use datafusion_expr::{
48-
expr_vec_fmt, ColumnarValue, Expr, ReturnTypeArgs, ScalarFunctionArgs, ScalarUDF,
48+
expr_vec_fmt, ColumnarValue, Expr, NullHandling, ReturnTypeArgs, ScalarFunctionArgs,
49+
ScalarUDF,
4950
};
5051

5152
/// Physical expression of a scalar function
@@ -186,6 +187,15 @@ impl PhysicalExpr for ScalarFunctionExpr {
186187
.map(|e| e.evaluate(batch))
187188
.collect::<Result<Vec<_>>>()?;
188189

190+
if self.fun.null_handling() == NullHandling::Propagate
191+
&& args.iter().any(
192+
|arg| matches!(arg, ColumnarValue::Scalar(scalar) if scalar.is_null()),
193+
)
194+
{
195+
let null_value = ScalarValue::try_from(&self.return_type)?;
196+
return Ok(ColumnarValue::Scalar(null_value));
197+
}
198+
189199
let input_empty = args.is_empty();
190200
let input_all_scalar = args
191201
.iter()

0 commit comments

Comments
 (0)