Skip to content
85 changes: 76 additions & 9 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub mod proxy;
pub mod string_utils;

use crate::error::{_internal_datafusion_err, _internal_err};
use crate::{DataFusionError, Result, ScalarValue};
use crate::{DataFusionError, Result, ScalarValue, _exec_datafusion_err};
use arrow::array::ArrayRef;
use arrow::buffer::OffsetBuffer;
use arrow::compute::{partition, SortColumn, SortOptions};
Expand Down Expand Up @@ -441,6 +441,13 @@ pub fn base_type(data_type: &DataType) -> DataType {
}
}

/// Information about how to coerce lists.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum ListCoercion {
/// [`DataType::FixedSizeList`] should be coerced to [`DataType::List`].
FixedSizedListToList,
}

/// A helper function to coerce base type in List.
///
/// Example
Expand All @@ -451,26 +458,47 @@ pub fn base_type(data_type: &DataType) -> DataType {
///
/// let data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
/// let base_type = DataType::Float64;
/// let coerced_type = coerced_type_with_base_type_only(&data_type, &base_type);
/// let coerced_type = coerced_type_with_base_type_only(&data_type, &base_type, None);
/// assert_eq!(coerced_type, DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))));
pub fn coerced_type_with_base_type_only(
data_type: &DataType,
base_type: &DataType,
array_coercion: Option<&ListCoercion>,
) -> DataType {
match data_type {
DataType::List(field) | DataType::FixedSizeList(field, _) => {
let field_type =
coerced_type_with_base_type_only(field.data_type(), base_type);
match (data_type, array_coercion) {
(DataType::List(field), _)
| (DataType::FixedSizeList(field, _), Some(ListCoercion::FixedSizedListToList)) =>
{
let field_type = coerced_type_with_base_type_only(
field.data_type(),
base_type,
array_coercion,
);

DataType::List(Arc::new(Field::new(
field.name(),
field_type,
field.is_nullable(),
)))
}
DataType::LargeList(field) => {
let field_type =
coerced_type_with_base_type_only(field.data_type(), base_type);
(DataType::FixedSizeList(field, len), _) => {
let field_type = coerced_type_with_base_type_only(
field.data_type(),
base_type,
array_coercion,
);

DataType::FixedSizeList(
Arc::new(Field::new(field.name(), field_type, field.is_nullable())),
*len,
)
}
(DataType::LargeList(field), _) => {
let field_type = coerced_type_with_base_type_only(
field.data_type(),
base_type,
array_coercion,
);

DataType::LargeList(Arc::new(Field::new(
field.name(),
Expand Down Expand Up @@ -724,6 +752,45 @@ pub fn combine_limit(
(combined_skip, combined_fetch)
}

/// Converts a collection of function arguments into an fixed-size array of length N
/// producing a reasonable error message in case of unexpected number of arguments.
///
/// # Example
/// ```
/// # use datafusion_common::Result;
/// # use datafusion_common::utils::take_function_args;
/// # use datafusion_common::ScalarValue;
/// fn my_function(args: &[ScalarValue]) -> Result<()> {
/// // function expects 2 args, so create a 2-element array
/// let [arg1, arg2] = take_function_args("my_function", args)?;
/// // ... do stuff..
/// Ok(())
/// }
///
/// // Calling the function with 1 argument produces an error:
/// let args = vec![ScalarValue::Int32(Some(10))];
/// let err = my_function(&args).unwrap_err();
/// assert_eq!(err.to_string(), "Execution error: my_function function requires 2 arguments, got 1");
/// // Calling the function with 2 arguments works great
/// let args = vec![ScalarValue::Int32(Some(10)), ScalarValue::Int32(Some(20))];
/// my_function(&args).unwrap();
/// ```
pub fn take_function_args<const N: usize, T>(
function_name: &str,
args: impl IntoIterator<Item = T>,
) -> Result<[T; N]> {
let args = args.into_iter().collect::<Vec<_>>();
args.try_into().map_err(|v: Vec<T>| {
_exec_datafusion_err!(
"{} function requires {} {}, got {}",
function_name,
N,
if N == 1 { "argument" } else { "arguments" },
v.len()
)
})
}

#[cfg(test)]
mod tests {
use crate::ScalarValue::Null;
Expand Down
170 changes: 123 additions & 47 deletions datafusion/expr-common/src/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
//! and return types of functions in DataFusion.

use arrow::datatypes::DataType;
use std::fmt::Display;

use datafusion_common::utils::ListCoercion;

/// Constant that is used as a placeholder for any valid timezone.
/// This is used where a function can accept a timestamp type with any
Expand Down Expand Up @@ -134,50 +137,71 @@ pub enum TypeSignature {

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum ArrayFunctionSignature {
/// Specialized Signature for ArrayAppend and similar functions
/// The first argument should be List/LargeList/FixedSizedList, and the second argument should be non-list or list.
/// The second argument's list dimension should be one dimension less than the first argument's list dimension.
/// List dimension of the List/LargeList is equivalent to the number of List.
/// List dimension of the non-list is 0.
ArrayAndElement,
/// Specialized Signature for ArrayPrepend and similar functions
/// The first argument should be non-list or list, and the second argument should be List/LargeList.
/// The first argument's list dimension should be one dimension less than the second argument's list dimension.
ElementAndArray,
/// Specialized Signature for Array functions of the form (List/LargeList, Index)
/// The first argument should be List/LargeList/FixedSizedList, and the second argument should be Int64.
ArrayAndIndex,
/// Specialized Signature for Array functions of the form (List/LargeList, Element, Optional Index)
ArrayAndElementAndOptionalIndex,
/// Specialized Signature for ArrayEmpty and similar functions
/// The function takes a single argument that must be a List/LargeList/FixedSizeList
/// or something that can be coerced to one of those types.
Array,
/// A function takes at least one List/LargeList/FixedSizeList argument.
Array {
/// A full list of the arguments accepted by this function.
arguments: Vec<ArrayFunctionArgument>,
/// Additional information about how array arguments should be coerced.
array_coercion: Option<ListCoercion>,
},
/// A function takes a single argument that must be a List/LargeList/FixedSizeList
/// which gets coerced to List, with element type recursively coerced to List too if it is list-like.
RecursiveArray,
/// Specialized Signature for MapArray
/// The function takes a single argument that must be a MapArray
MapArray,
}

impl std::fmt::Display for ArrayFunctionSignature {
impl Display for ArrayFunctionSignature {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ArrayFunctionSignature::ArrayAndElement => {
write!(f, "array, element")
ArrayFunctionSignature::Array { arguments, .. } => {
for (idx, argument) in arguments.iter().enumerate() {
write!(f, "{argument}")?;
if idx != arguments.len() - 1 {
write!(f, ", ")?;
}
}
Ok(())
}
ArrayFunctionSignature::ArrayAndElementAndOptionalIndex => {
write!(f, "array, element, [index]")
ArrayFunctionSignature::RecursiveArray => {
write!(f, "recursive_array")
}
ArrayFunctionSignature::ElementAndArray => {
write!(f, "element, array")
ArrayFunctionSignature::MapArray => {
write!(f, "map_array")
}
ArrayFunctionSignature::ArrayAndIndex => {
write!(f, "array, index")
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum ArrayFunctionArgument {
/// A non-list or list argument. The list dimensions should be one less than the Array's list
/// dimensions.
Element,
/// An Int64 index argument.
Index,
/// An argument of type List/LargeList/FixedSizeList. All Array arguments must be coercible
/// to the same type.
Array,
// A Utf8 argument.
String,
}

impl Display for ArrayFunctionArgument {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ArrayFunctionArgument::Element => {
write!(f, "element")
}
ArrayFunctionArgument::Index => {
write!(f, "index")
}
ArrayFunctionSignature::Array => {
ArrayFunctionArgument::Array => {
write!(f, "array")
}
ArrayFunctionSignature::MapArray => {
write!(f, "map_array")
ArrayFunctionArgument::String => {
write!(f, "string")
}
}
}
Expand Down Expand Up @@ -224,7 +248,7 @@ impl TypeSignature {
}

/// Helper function to join types with specified delimiter.
pub fn join_types<T: std::fmt::Display>(types: &[T], delimiter: &str) -> String {
pub fn join_types<T: Display>(types: &[T], delimiter: &str) -> String {
types
.iter()
.map(|t| t.to_string())
Expand Down Expand Up @@ -336,55 +360,107 @@ impl Signature {
volatility,
}
}

/// Any one of a list of [TypeSignature]s.
pub fn one_of(type_signatures: Vec<TypeSignature>, volatility: Volatility) -> Self {
Signature {
type_signature: TypeSignature::OneOf(type_signatures),
volatility,
}
}
/// Specialized Signature for ArrayAppend and similar functions

/// Specialized [Signature] for ArrayAppend and similar functions.
pub fn array_and_element(volatility: Volatility) -> Self {
Signature {
type_signature: TypeSignature::ArraySignature(
ArrayFunctionSignature::ArrayAndElement,
ArrayFunctionSignature::Array {
arguments: vec![
ArrayFunctionArgument::Array,
ArrayFunctionArgument::Element,
],
array_coercion: Some(ListCoercion::FixedSizedListToList),
},
),
volatility,
}
}
/// Specialized Signature for Array functions with an optional index
pub fn array_and_element_and_optional_index(volatility: Volatility) -> Self {

/// Specialized [Signature] for ArrayPrepend and similar functions.
pub fn element_and_array(volatility: Volatility) -> Self {
Signature {
type_signature: TypeSignature::ArraySignature(
ArrayFunctionSignature::ArrayAndElementAndOptionalIndex,
ArrayFunctionSignature::Array {
arguments: vec![
ArrayFunctionArgument::Element,
ArrayFunctionArgument::Array,
],
array_coercion: Some(ListCoercion::FixedSizedListToList),
},
),
volatility,
}
}
/// Specialized Signature for ArrayPrepend and similar functions
pub fn element_and_array(volatility: Volatility) -> Self {

/// Specialized [Signature] for functions that take a fixed number of arrays.
pub fn arrays(
n: usize,
coercion: Option<ListCoercion>,
volatility: Volatility,
) -> Self {
Signature {
type_signature: TypeSignature::ArraySignature(
ArrayFunctionSignature::ElementAndArray,
ArrayFunctionSignature::Array {
arguments: vec![ArrayFunctionArgument::Array; n],
array_coercion: coercion,
},
),
volatility,
}
}
/// Specialized Signature for ArrayElement and similar functions

/// Specialized [Signature] for Array functions with an optional index.
pub fn array_and_element_and_optional_index(volatility: Volatility) -> Self {
Signature {
type_signature: TypeSignature::OneOf(vec![
TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
arguments: vec![
ArrayFunctionArgument::Array,
ArrayFunctionArgument::Element,
],
array_coercion: None,
}),
TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
arguments: vec![
ArrayFunctionArgument::Array,
ArrayFunctionArgument::Element,
ArrayFunctionArgument::Index,
],
array_coercion: None,
}),
]),
volatility,
}
}

/// Specialized [Signature] for ArrayElement and similar functions.
pub fn array_and_index(volatility: Volatility) -> Self {
Signature {
type_signature: TypeSignature::ArraySignature(
ArrayFunctionSignature::ArrayAndIndex,
ArrayFunctionSignature::Array {
arguments: vec![
ArrayFunctionArgument::Array,
ArrayFunctionArgument::Index,
],
array_coercion: Some(ListCoercion::FixedSizedListToList),
},
),
volatility,
}
}
/// Specialized Signature for ArrayEmpty and similar functions

/// Specialized [Signature] for ArrayEmpty and similar functions.
pub fn array(volatility: Volatility) -> Self {
Signature {
type_signature: TypeSignature::ArraySignature(ArrayFunctionSignature::Array),
volatility,
}
Signature::arrays(1, Some(ListCoercion::FixedSizedListToList), volatility)
}
}

Expand Down
Loading
Loading