Skip to content

Implement array_union #7897

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Nov 13, 2023
6 changes: 6 additions & 0 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ pub enum BuiltinScalarFunction {
ArrayToString,
/// array_intersect
ArrayIntersect,
/// array_union
ArrayUnion,
/// cardinality
Cardinality,
/// construct an array from columns
Expand Down Expand Up @@ -401,6 +403,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArraySlice => Volatility::Immutable,
BuiltinScalarFunction::ArrayToString => Volatility::Immutable,
BuiltinScalarFunction::ArrayIntersect => Volatility::Immutable,
BuiltinScalarFunction::ArrayUnion => Volatility::Immutable,
BuiltinScalarFunction::Cardinality => Volatility::Immutable,
BuiltinScalarFunction::MakeArray => Volatility::Immutable,
BuiltinScalarFunction::Ascii => Volatility::Immutable,
Expand Down Expand Up @@ -581,6 +584,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArraySlice => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayToString => Ok(Utf8),
BuiltinScalarFunction::ArrayIntersect => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayUnion => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::Cardinality => Ok(UInt64),
BuiltinScalarFunction::MakeArray => match input_expr_types.len() {
0 => Ok(List(Arc::new(Field::new("item", Null, true)))),
Expand Down Expand Up @@ -885,6 +889,7 @@ impl BuiltinScalarFunction {
Signature::variadic_any(self.volatility())
}
BuiltinScalarFunction::ArrayIntersect => Signature::any(2, self.volatility()),
BuiltinScalarFunction::ArrayUnion => Signature::any(2, self.volatility()),
BuiltinScalarFunction::Cardinality => Signature::any(1, self.volatility()),
BuiltinScalarFunction::MakeArray => {
// 0 or more arguments of arbitrary type
Expand Down Expand Up @@ -1508,6 +1513,7 @@ fn aliases(func: &BuiltinScalarFunction) -> &'static [&'static str] {
"array_join",
"list_join",
],
BuiltinScalarFunction::ArrayUnion => &["array_union", "list_union"],
BuiltinScalarFunction::Cardinality => &["cardinality"],
BuiltinScalarFunction::MakeArray => &["make_array", "make_list"],
BuiltinScalarFunction::ArrayIntersect => &["array_intersect", "list_intersect"],
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,8 @@ scalar_expr!(
array delimiter,
"converts each element to its text representation."
);
scalar_expr!(ArrayUnion, array_union, array1 array2, "returns an array of the elements in the union of array1 and array2 without duplicates.");

scalar_expr!(
Cardinality,
cardinality,
Expand Down
83 changes: 82 additions & 1 deletion datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use arrow::datatypes::{DataType, Field, UInt64Type};
use arrow::row::{RowConverter, SortField};
use arrow_buffer::NullBuffer;

use arrow_schema::FieldRef;
use datafusion_common::cast::{
as_generic_string_array, as_int64_array, as_list_array, as_string_array,
};
Expand All @@ -36,8 +37,8 @@ use datafusion_common::{
DataFusionError, Result,
};

use hashbrown::HashSet;
use itertools::Itertools;
use std::collections::HashSet;

macro_rules! downcast_arg {
($ARG:expr, $ARRAY_TYPE:ident) => {{
Expand Down Expand Up @@ -1340,6 +1341,86 @@ macro_rules! to_string {
}};
}

fn union_generic_lists<OffsetSize: OffsetSizeTrait>(
l: &GenericListArray<OffsetSize>,
r: &GenericListArray<OffsetSize>,
field: &FieldRef,
) -> Result<GenericListArray<OffsetSize>> {
let converter = RowConverter::new(vec![SortField::new(l.value_type().clone())])?;

let nulls = NullBuffer::union(l.nulls(), r.nulls());
let l_values = l.values().clone();
let r_values = r.values().clone();
let l_values = converter.convert_columns(&[l_values])?;
let r_values = converter.convert_columns(&[r_values])?;

// Might be worth adding an upstream OffsetBufferBuilder
let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really neat implementation @edmondop.

offsets.push(OffsetSize::usize_as(0));
let mut rows = Vec::with_capacity(l_values.num_rows() + r_values.num_rows());
let mut dedup = HashSet::new();
for (l_w, r_w) in l.offsets().windows(2).zip(r.offsets().windows(2)) {
let l_slice = l_w[0].as_usize()..l_w[1].as_usize();
let r_slice = r_w[0].as_usize()..r_w[1].as_usize();
for i in l_slice {
let left_row = l_values.row(i);
if dedup.insert(left_row) {
rows.push(left_row);
}
}
for i in r_slice {
let right_row = r_values.row(i);
if dedup.insert(right_row) {
rows.push(right_row);
}
}
offsets.push(OffsetSize::usize_as(rows.len()));
dedup.clear();
}

let values = converter.convert_rows(rows)?;
let offsets = OffsetBuffer::new(offsets.into());
let result = values[0].clone();
Ok(GenericListArray::<OffsetSize>::new(
field.clone(),
offsets,
result,
nulls,
))
}

/// Array_union SQL function
pub fn array_union(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.len() != 2 {
return exec_err!("array_union needs two arguments");
}
let array1 = &args[0];
let array2 = &args[1];
match (array1.data_type(), array2.data_type()) {
(DataType::Null, _) => Ok(array2.clone()),
(_, DataType::Null) => Ok(array1.clone()),
(DataType::List(field_ref), DataType::List(_)) => {
check_datatypes("array_union", &[&array1, &array2])?;
let list1 = array1.as_list::<i32>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer datafusion::common::cast as_list_array and as_large_list_array

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datafusion::common::cast and arrow::array::cast are doing the similar thing. The difference is that common::cast return datafusion error, while arrow::array::cast return Arror error. To me, return datafusion error in datafusion project make much more senses for me. Also, mixing these two casting is quite messy, we can have arrow casting inside common::cast and call common::cast for other crate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just found that arrow::array::cast does not return Result<>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never mind, I don't have strong opinion on which to use yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we directly use as_list_array here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the offset i32 branch you mean @Weijun-H ? I like the fact that the code has the same structure for LargeList and List

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I think as_list_array can avoid panic because it returns the Return type.

let list2 = array2.as_list::<i32>();
let result = union_generic_lists::<i32>(list1, list2, field_ref)?;
Ok(Arc::new(result))
}
(DataType::LargeList(field_ref), DataType::LargeList(_)) => {
check_datatypes("array_union", &[&array1, &array2])?;
let list1 = array1.as_list::<i64>();
let list2 = array2.as_list::<i64>();
let result = union_generic_lists::<i64>(list1, list2, field_ref)?;
Ok(Arc::new(result))
}
_ => {
internal_err!(
"array_union only support list with offsets of type int32 and int64"
)
}
}
}

/// Array_to_string SQL function
pub fn array_to_string(args: &[ArrayRef]) -> Result<ArrayRef> {
let arr = &args[0];
Expand Down
4 changes: 3 additions & 1 deletion datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,9 @@ pub fn create_physical_fun(
BuiltinScalarFunction::MakeArray => {
Arc::new(|args| make_scalar_function(array_expressions::make_array)(args))
}

BuiltinScalarFunction::ArrayUnion => {
Arc::new(|args| make_scalar_function(array_expressions::array_union)(args))
}
// struct functions
BuiltinScalarFunction::Struct => Arc::new(struct_expressions::struct_expr),

Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ enum ScalarFunction {
StringToArray = 117;
ToTimestampNanos = 118;
ArrayIntersect = 119;
ArrayUnion = 120;
}

message ScalarFunctionNode {
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions datafusion/proto/src/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
ScalarFunction::ArraySlice => Self::ArraySlice,
ScalarFunction::ArrayToString => Self::ArrayToString,
ScalarFunction::ArrayIntersect => Self::ArrayIntersect,
ScalarFunction::ArrayUnion => Self::ArrayUnion,
ScalarFunction::Cardinality => Self::Cardinality,
ScalarFunction::Array => Self::MakeArray,
ScalarFunction::NullIf => Self::NullIf,
Expand Down Expand Up @@ -1424,6 +1425,12 @@ pub fn parse_expr(
ScalarFunction::ArrayNdims => {
Ok(array_ndims(parse_expr(&args[0], registry)?))
}
ScalarFunction::ArrayUnion => Ok(array(
args.to_owned()
.iter()
.map(|expr| parse_expr(expr, registry))
.collect::<Result<Vec<_>, _>>()?,
)),
ScalarFunction::Sqrt => Ok(sqrt(parse_expr(&args[0], registry)?)),
ScalarFunction::Cbrt => Ok(cbrt(parse_expr(&args[0], registry)?)),
ScalarFunction::Sin => Ok(sin(parse_expr(&args[0], registry)?)),
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
BuiltinScalarFunction::ArraySlice => Self::ArraySlice,
BuiltinScalarFunction::ArrayToString => Self::ArrayToString,
BuiltinScalarFunction::ArrayIntersect => Self::ArrayIntersect,
BuiltinScalarFunction::ArrayUnion => Self::ArrayUnion,
BuiltinScalarFunction::Cardinality => Self::Cardinality,
BuiltinScalarFunction::MakeArray => Self::Array,
BuiltinScalarFunction::NullIf => Self::NullIf,
Expand Down
95 changes: 95 additions & 0 deletions datafusion/sqllogictest/test_files/array.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1919,6 +1919,101 @@ select array_to_string(make_array(), ',')
----
(empty)


## array_union (aliases: `list_union`)

# array_union scalar function #1
query ?
select array_union([1, 2, 3, 4], [5, 6, 3, 4]);
----
[1, 2, 3, 4, 5, 6]

# array_union scalar function #2
query ?
select array_union([1, 2, 3, 4], [5, 6, 7, 8]);
----
[1, 2, 3, 4, 5, 6, 7, 8]

# array_union scalar function #3
query ?
select array_union([1,2,3], []);
----
[1, 2, 3]

# array_union scalar function #4
query ?
select array_union([1, 2, 3, 4], [5, 4]);
----
[1, 2, 3, 4, 5]

# array_union scalar function #5
statement ok
CREATE TABLE arrays_with_repeating_elements_for_union
AS VALUES
([1], [2]),
([2, 3], [3]),
([3], [3, 4])
;

query ?
select array_union(column1, column2) from arrays_with_repeating_elements_for_union;
----
[1, 2]
[2, 3]
[3, 4]

statement ok
drop table arrays_with_repeating_elements_for_union;

# array_union scalar function #6
query ?
select array_union([], []);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checked in spark

scala> spark.sql("select array_union(array(), array())").show(false)
+-----------------------------+                                                 
|array_union(array(), array())|
+-----------------------------+
|[]                           |
+-----------------------------+

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am even surprised my code work here tbh, do I need to add an additional branch to pattern matching where both array have null data type and return an empty array?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am even surprised my code work here tbh, do I need to add an additional branch to pattern matching where both array have null data type and return an empty array?

I prefer return empty array in this case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am even surprised my code work here tbh, do I need to add an additional branch to pattern matching where both array have null data type and return an empty array?

I don't think you should check the input types (they should still be lists, they'll just have an offset buffer of [0, 1]

Maybe you just need to handle the case specially in array_union

----
NULL

# array_union scalar function #7
query ?
select array_union([[null]], []);
----
[[]]

# array_union scalar function #8
query ?
select array_union([null], [null]);
----
[]

# array_union scalar function #9
query ?
select array_union(null, []);
----
NULL

# array_union scalar function #10
query ?
select array_union(null, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala> spark.sql("select array_union(array(null), array(null))").show(false)
+-------------------------------------+
|array_union(array(NULL), array(NULL))|
+-------------------------------------+
|[NULL]                               |
+-------------------------------------+

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more interesting, because I am not sure null has offsets in the ListArray, I suppose my code just get into the union function, but it exits immediately with empty results, because the offsets are empty...How would you go about it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala> spark.sql("select array_union(array(null), array(null))").show(false)
+-------------------------------------+
|array_union(array(NULL), array(NULL))|
+-------------------------------------+
|[NULL]                               |
+-------------------------------------+

array(null) is not the same as null.

array(null) is like array_union scalar function #8

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to add an extra branch in my match case like so:

  (DataType::Null, DataType::Null) => {
            let data = ArrayData::new_empty(&DataType::Null);
            Ok(Arc::new(FixedSizeListArray::from(data)))
        } 

but that also produce a NULL response rather than an empty arary, do you have any guidance @comphead ? It's unclear to me how to get an [] and also a [NULL]

----
NULL

# array_union scalar function #11
query ?
select array_union([1.2, 3.0], [1.2, 3.0, 5.7]);
----
[1.2, 3.0, 5.7]

# array_union scalar function #12
query ?
select array_union(['hello'], ['hello','datafusion']);
----
[hello, datafusion]



Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets remove empty lines






# list_to_string scalar function #4 (function alias `array_to_string`)
query TTT
select list_to_string(['h', 'e', 'l', 'l', 'o'], ','), list_to_string([1, 2, 3, 4, 5], '-'), list_to_string([1.0, 2.0, 3.0], '|');
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ Unlike to some databases the math functions in Datafusion works the same way as
| array_slice(array, index) | Returns a slice of the array. `array_slice([1, 2, 3, 4, 5, 6, 7, 8], 3, 6) -> [3, 4, 5, 6]` |
| array_to_string(array, delimiter) | Converts each element to its text representation. `array_to_string([1, 2, 3, 4], ',') -> 1,2,3,4` |
| array_intersect(array1, array2) | Returns an array of the elements in the intersection of array1 and array2. `array_intersect([1, 2, 3, 4], [5, 6, 3, 4]) -> [3, 4]` |
| array_union(array1, array2) | Returns an array of the elements in the union of array1 and array2 without duplicates. `array_union([1, 2, 3, 4], [5, 6, 3, 4]) -> [1, 2, 3, 4, 5, 6]` |
| cardinality(array) | Returns the total number of elements in the array. `cardinality([[1, 2, 3], [4, 5, 6]]) -> 6` |
| make_array(value1, [value2 [, ...]]) | Returns an Arrow array using the specified input expressions. `make_array(1, 2, 3) -> [1, 2, 3]` |
| trim_array(array, n) | Deprecated |
Expand Down
Loading