Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ arrow-string = { version = "52.0.0", default-features = false }
async-trait = "0.1.73"
bigdecimal = "=0.4.1"
bytes = "1.4"
chrono = { version = "0.4.34", default-features = false }
chrono = { version = ">=0.4.34,<0.4.40", default-features = false }
ctor = "0.2.0"
dashmap = "5.5.0"
datafusion = { path = "datafusion/core", version = "39.0.0", default-features = false }
Expand Down
189 changes: 96 additions & 93 deletions datafusion/functions-array/src/set_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@

//! [`ScalarUDFImpl`] definitions for array_union, array_intersect and array_distinct functions.

use crate::make_array::{empty_array_type, make_array_inner};
use crate::utils::make_scalar_function;
use arrow::array::{new_empty_array, Array, ArrayRef, GenericListArray, OffsetSizeTrait};
use arrow::array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait};
use arrow::buffer::OffsetBuffer;
use arrow::compute;
use arrow::datatypes::{DataType, Field, FieldRef};
use arrow::row::{RowConverter, SortField};
use arrow_array::{new_null_array, LargeListArray, ListArray};
use arrow_schema::DataType::{FixedSizeList, LargeList, List, Null};
use datafusion_common::cast::{as_large_list_array, as_list_array};
use datafusion_common::{exec_err, internal_err, Result};
use datafusion_common::{exec_err, internal_err, plan_err, Result};
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
use itertools::Itertools;
use std::any::Any;
Expand Down Expand Up @@ -89,7 +89,8 @@ impl ScalarUDFImpl for ArrayUnion {

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
match (&arg_types[0], &arg_types[1]) {
(&Null, dt) => Ok(dt.clone()),
(Null, Null) => Ok(DataType::new_list(Null, true)),
(Null, dt) => Ok(dt.clone()),
(dt, Null) => Ok(dt.clone()),
(dt, _) => Ok(dt.clone()),
}
Expand Down Expand Up @@ -134,9 +135,10 @@ impl ScalarUDFImpl for ArrayIntersect {

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
match (arg_types[0].clone(), arg_types[1].clone()) {
(Null, Null) | (Null, _) => Ok(Null),
(_, Null) => Ok(empty_array_type()),
(dt, _) => Ok(dt),
(Null, Null) => Ok(DataType::new_list(Null, true)),
(Null, dt) => Ok(dt.clone()),
(dt, Null) => Ok(dt.clone()),
(dt, _) => Ok(dt.clone()),
}
}

Expand Down Expand Up @@ -179,19 +181,13 @@ impl ScalarUDFImpl for ArrayDistinct {

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
match &arg_types[0] {
List(field) | FixedSizeList(field, _) => Ok(List(Arc::new(Field::new(
"item",
field.data_type().clone(),
true,
)))),
LargeList(field) => Ok(LargeList(Arc::new(Field::new(
"item",
field.data_type().clone(),
true,
)))),
_ => exec_err!(
"Not reachable, data_type should be List, LargeList or FixedSizeList"
),
List(field) | FixedSizeList(field, _) => {
Ok(DataType::new_list(field.data_type().clone(), true))
}
LargeList(field) => {
Ok(DataType::new_large_list(field.data_type().clone(), true))
}
arg_type => plan_err!("{} does not support type {arg_type}", self.name()),
}
}

Expand All @@ -211,22 +207,18 @@ fn array_distinct_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
return exec_err!("array_distinct needs one argument");
}

// handle null
if args[0].data_type() == &Null {
return Ok(args[0].clone());
}

// handle for list & largelist
match args[0].data_type() {
let array = &args[0];
match array.data_type() {
Null => Ok(Arc::clone(array)),
List(field) => {
let array = as_list_array(&args[0])?;
let array = as_list_array(array)?;
general_array_distinct(array, field)
}
LargeList(field) => {
let array = as_large_list_array(&args[0])?;
let array = as_large_list_array(array)?;
general_array_distinct(array, field)
}
array_type => exec_err!("array_distinct does not support type '{array_type:?}'"),
arg_type => exec_err!("array_distinct does not support type '{arg_type:?}'"),
}
}

Expand All @@ -251,80 +243,69 @@ fn generic_set_lists<OffsetSize: OffsetSizeTrait>(
field: Arc<Field>,
set_op: SetOp,
) -> Result<ArrayRef> {
if matches!(l.value_type(), Null) {
if l.is_empty() || l.value_type().is_null() {
let field = Arc::new(Field::new("item", r.value_type(), true));
return general_array_distinct::<OffsetSize>(r, &field);
} else if matches!(r.value_type(), Null) {
} else if r.is_empty() || r.value_type().is_null() {
let field = Arc::new(Field::new("item", l.value_type(), true));
return general_array_distinct::<OffsetSize>(l, &field);
}

// Handle empty array at rhs case
// array_union(arr, []) -> arr;
// array_intersect(arr, []) -> [];
if r.value_length(0).is_zero() {
if set_op == SetOp::Union {
return Ok(Arc::new(l.clone()) as ArrayRef);
} else {
return Ok(Arc::new(r.clone()) as ArrayRef);
}
}

if l.value_type() != r.value_type() {
return internal_err!("{set_op:?} is not implemented for '{l:?}' and '{r:?}'");
return internal_err!(
"{set_op} is not implemented for {} and {}",
l.data_type(),
r.data_type()
);
}

let dt = l.value_type();

let mut offsets = vec![OffsetSize::usize_as(0)];
let mut new_arrays = vec![];

let converter = RowConverter::new(vec![SortField::new(dt)])?;
let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;
for (first_arr, second_arr) in l.iter().zip(r.iter()) {
if let (Some(first_arr), Some(second_arr)) = (first_arr, second_arr) {
let l_values = converter.convert_columns(&[first_arr])?;
let r_values = converter.convert_columns(&[second_arr])?;

let l_iter = l_values.iter().sorted().dedup();
let values_set: HashSet<_> = l_iter.clone().collect();
let mut rows = if set_op == SetOp::Union {
l_iter.collect::<Vec<_>>()
} else {
vec![]
};
for r_val in r_values.iter().sorted().dedup() {
match set_op {
SetOp::Union => {
if !values_set.contains(&r_val) {
rows.push(r_val);
}
}
SetOp::Intersect => {
if values_set.contains(&r_val) {
rows.push(r_val);
}
}
}
}
let l_values = if let Some(first_arr) = first_arr {
converter.convert_columns(&[first_arr])?
} else {
converter.convert_columns(&[])?
};

let r_values = if let Some(second_arr) = second_arr {
converter.convert_columns(&[second_arr])?
} else {
converter.convert_columns(&[])?
};

let l_iter = l_values.iter().sorted().dedup();
let values_set: HashSet<_> = l_iter.clone().collect();
let mut rows = if set_op == SetOp::Union {
l_iter.collect()
} else {
vec![]
};

let last_offset = match offsets.last().copied() {
Some(offset) => offset,
None => return internal_err!("offsets should not be empty"),
};
offsets.push(last_offset + OffsetSize::usize_as(rows.len()));
let arrays = converter.convert_rows(rows)?;
let array = match arrays.first() {
Some(array) => array.clone(),
None => {
return internal_err!("{set_op}: failed to get array from rows");
}
};
new_arrays.push(array);
for r_val in r_values.iter().sorted().dedup() {
match set_op {
SetOp::Union if !values_set.contains(&r_val) => rows.push(r_val),
SetOp::Intersect if values_set.contains(&r_val) => rows.push(r_val),
_ => (),
}
}

let last_offset = match offsets.last() {
Some(offset) => *offset,
None => return internal_err!("offsets should not be empty"),
};

offsets.push(last_offset + OffsetSize::usize_as(rows.len()));
let arrays = converter.convert_rows(rows)?;
new_arrays.push(match arrays.first() {
Some(array) => Arc::clone(array),
None => return internal_err!("{set_op}: failed to get array from rows"),
});
}

let offsets = OffsetBuffer::new(offsets.into());
let new_arrays_ref = new_arrays.iter().map(|v| v.as_ref()).collect::<Vec<_>>();
let new_arrays_ref: Vec<_> = new_arrays.iter().map(|v| v.as_ref()).collect();
let values = compute::concat(&new_arrays_ref)?;
let arr = GenericListArray::<OffsetSize>::try_new(field, offsets, values, None)?;
Ok(Arc::new(arr))
Expand All @@ -335,38 +316,60 @@ fn general_set_op(
array2: &ArrayRef,
set_op: SetOp,
) -> Result<ArrayRef> {
fn empty_array(data_type: &DataType, len: usize, large: bool) -> Result<ArrayRef> {
let field = Arc::new(Field::new_list_field(data_type.clone(), true));
let values = new_null_array(data_type, len);
if large {
Ok(Arc::new(LargeListArray::try_new(
field,
OffsetBuffer::new_zeroed(len),
values,
None,
)?))
} else {
Ok(Arc::new(ListArray::try_new(
field,
OffsetBuffer::new_zeroed(len),
values,
None,
)?))
}
}

match (array1.data_type(), array2.data_type()) {
(Null, Null) => Ok(Arc::new(ListArray::new_null(
Arc::new(Field::new_list_field(Null, true)),
array1.len(),
))),
(Null, List(field)) => {
if set_op == SetOp::Intersect {
return Ok(new_empty_array(&Null));
return empty_array(field.data_type(), array1.len(), false);
}
let array = as_list_array(&array2)?;
general_array_distinct::<i32>(array, field)
}

(List(field), Null) => {
if set_op == SetOp::Intersect {
return make_array_inner(&[]);
return empty_array(field.data_type(), array1.len(), false);
}
let array = as_list_array(&array1)?;
general_array_distinct::<i32>(array, field)
}
(Null, LargeList(field)) => {
if set_op == SetOp::Intersect {
return Ok(new_empty_array(&Null));
return empty_array(field.data_type(), array1.len(), true);
}
let array = as_large_list_array(&array2)?;
general_array_distinct::<i64>(array, field)
}
(LargeList(field), Null) => {
if set_op == SetOp::Intersect {
return make_array_inner(&[]);
return empty_array(field.data_type(), array1.len(), true);
}
let array = as_large_list_array(&array1)?;
general_array_distinct::<i64>(array, field)
}
(Null, Null) => Ok(new_empty_array(&Null)),

(List(field), List(_)) => {
let array1 = as_list_array(&array1)?;
let array2 = as_list_array(&array2)?;
Expand Down
21 changes: 10 additions & 11 deletions datafusion/sqllogictest/test_files/array.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3820,21 +3820,24 @@ select array_union(arrow_cast([1, 2, 3, 4], 'LargeList(Int64)'), arrow_cast([5,
statement ok
CREATE TABLE arrays_with_repeating_elements_for_union
AS VALUES
([1], [2]),
([0, 1, 1], []),
([1, 1], [2]),
([2, 3], [3]),
([3], [3, 4])
;

query ?
select array_union(column1, column2) from arrays_with_repeating_elements_for_union;
----
[0, 1]
[1, 2]
[2, 3]
[3, 4]

query ?
select array_union(arrow_cast(column1, 'LargeList(Int64)'), arrow_cast(column2, 'LargeList(Int64)')) from arrays_with_repeating_elements_for_union;
----
[0, 1]
[1, 2]
[2, 3]
[3, 4]
Expand All @@ -3854,15 +3857,11 @@ select array_union(arrow_cast([], 'LargeList(Int64)'), arrow_cast([], 'LargeList
[]

# array_union scalar function #7
query ?
query error DataFusion error: Internal error: array_union is not implemented for
select array_union([[null]], []);
----
[[]]

query ?
query error DataFusion error: Internal error: array_union is not implemented for
select array_union(arrow_cast([[null]], 'LargeList(List(Int64))'), arrow_cast([], 'LargeList(Int64)'));
----
[[]]

# array_union scalar function #8
query ?
Expand Down Expand Up @@ -5530,12 +5529,12 @@ select array_intersect(arrow_cast([1, 1, 2, 2, 3, 3], 'LargeList(Int64)'), null)
query ?
select array_intersect(null, [1, 1, 2, 2, 3, 3]);
----
NULL
[]

query ?
select array_intersect(null, arrow_cast([1, 1, 2, 2, 3, 3], 'LargeList(Int64)'));
----
NULL
[]

query ?
select array_intersect([], null);
Expand All @@ -5560,12 +5559,12 @@ select array_intersect(arrow_cast([], 'LargeList(Int64)'), null);
query ?
select array_intersect(null, []);
----
NULL
[]

query ?
select array_intersect(null, arrow_cast([], 'LargeList(Int64)'));
----
NULL
[]

query ?
select array_intersect(null, null);
Expand Down
20 changes: 0 additions & 20 deletions rust-toolchain.toml

This file was deleted.

Loading