Skip to content

Commit ec3d443

Browse files
implement distinct func
implement slt & proto fix null & empty list
1 parent 2071259 commit ec3d443

File tree

11 files changed

+177
-8
lines changed

11 files changed

+177
-8
lines changed

datafusion/expr/src/built_in_function.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ pub enum BuiltinScalarFunction {
144144
ArrayPopBack,
145145
/// array_dims
146146
ArrayDims,
147+
/// array_distinct
148+
ArrayDistinct,
147149
/// array_element
148150
ArrayElement,
149151
/// array_empty
@@ -397,6 +399,7 @@ impl BuiltinScalarFunction {
397399
BuiltinScalarFunction::ArrayHasAny => Volatility::Immutable,
398400
BuiltinScalarFunction::ArrayHas => Volatility::Immutable,
399401
BuiltinScalarFunction::ArrayDims => Volatility::Immutable,
402+
BuiltinScalarFunction::ArrayDistinct => Volatility::Immutable,
400403
BuiltinScalarFunction::ArrayElement => Volatility::Immutable,
401404
BuiltinScalarFunction::ArrayExcept => Volatility::Immutable,
402405
BuiltinScalarFunction::ArrayLength => Volatility::Immutable,
@@ -574,6 +577,7 @@ impl BuiltinScalarFunction {
574577
BuiltinScalarFunction::ArrayDims => {
575578
Ok(List(Arc::new(Field::new("item", UInt64, true))))
576579
}
580+
BuiltinScalarFunction::ArrayDistinct => Ok(input_expr_types[0].clone()),
577581
BuiltinScalarFunction::ArrayElement => match &input_expr_types[0] {
578582
List(field) => Ok(field.data_type().clone()),
579583
_ => plan_err!(
@@ -915,6 +919,7 @@ impl BuiltinScalarFunction {
915919
Signature::variadic_any(self.volatility())
916920
}
917921
BuiltinScalarFunction::ArrayNdims => Signature::any(1, self.volatility()),
922+
BuiltinScalarFunction::ArrayDistinct => Signature::any(1, self.volatility()),
918923
BuiltinScalarFunction::ArrayPosition => {
919924
Signature::variadic_any(self.volatility())
920925
}
@@ -1543,6 +1548,7 @@ fn aliases(func: &BuiltinScalarFunction) -> &'static [&'static str] {
15431548
&["array_concat", "array_cat", "list_concat", "list_cat"]
15441549
}
15451550
BuiltinScalarFunction::ArrayDims => &["array_dims", "list_dims"],
1551+
BuiltinScalarFunction::ArrayDistinct => &["array_distinct", "list_distinct"],
15461552
BuiltinScalarFunction::ArrayEmpty => &["empty"],
15471553
BuiltinScalarFunction::ArrayElement => &[
15481554
"array_element",

datafusion/expr/src/expr_fn.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,12 @@ scalar_expr!(
658658
array,
659659
"returns the number of dimensions of the array."
660660
);
661+
scalar_expr!(
662+
ArrayDistinct,
663+
array_distinct,
664+
array,
665+
"return distinct values from the array after removing duplicates."
666+
);
661667
scalar_expr!(
662668
ArrayPosition,
663669
array_position,

datafusion/physical-expr/src/array_expressions.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1991,6 +1991,56 @@ pub fn array_intersect(args: &[ArrayRef]) -> Result<ArrayRef> {
19911991
}
19921992
}
19931993

1994+
/// array_distinct SQL function
1995+
/// example: from list [1, 3, 2, 3, 1, 2, 4] to [1, 2, 3, 4]
1996+
pub fn array_distinct(args: &[ArrayRef]) -> Result<ArrayRef> {
1997+
assert_eq!(args.len(), 1);
1998+
1999+
// handle null
2000+
if args[0].data_type() == &DataType::Null {
2001+
return Ok(args[0].clone());
2002+
}
2003+
2004+
let array = as_list_array(&args[0])?;
2005+
let dt = array.value_type();
2006+
2007+
let mut offsets = vec![0];
2008+
let mut new_arrays = vec![];
2009+
2010+
let converter = RowConverter::new(vec![SortField::new(dt.clone())])?;
2011+
// distinct for each list in ListArray
2012+
for arr in array.iter().flatten() {
2013+
let values = converter.convert_columns(&[arr])?;
2014+
2015+
let mut rows = Vec::with_capacity(values.num_rows());
2016+
// sort elements in list and remove duplicates
2017+
for val in values.iter().sorted().dedup() {
2018+
rows.push(val);
2019+
}
2020+
2021+
let last_offset: i32 = match offsets.last().copied() {
2022+
Some(offset) => offset,
2023+
None => return internal_err!("offsets should not be empty"),
2024+
};
2025+
offsets.push(last_offset + rows.len() as i32);
2026+
let arrays = converter.convert_rows(rows)?;
2027+
let array = match arrays.get(0) {
2028+
Some(array) => array.clone(),
2029+
None => {
2030+
return internal_err!("array_distinct: failed to get array from rows")
2031+
}
2032+
};
2033+
new_arrays.push(array);
2034+
}
2035+
2036+
let field = Arc::new(Field::new("item", dt, true));
2037+
let offsets = OffsetBuffer::new(offsets.into());
2038+
let new_arrays_ref = new_arrays.iter().map(|v| v.as_ref()).collect::<Vec<_>>();
2039+
let values = compute::concat(&new_arrays_ref)?;
2040+
let arr = Arc::new(ListArray::try_new(field, offsets, values, None)?);
2041+
Ok(arr)
2042+
}
2043+
19942044
#[cfg(test)]
19952045
mod tests {
19962046
use super::*;
@@ -3028,4 +3078,31 @@ mod tests {
30283078

30293079
make_array(&[arr1, arr2]).expect("failed to initialize function array")
30303080
}
3081+
3082+
#[test]
3083+
fn test_array_distinct() {
3084+
// test: from array [[1, 3, 2, 3, 1, 2, 4], [1, 1]] to [[1, 2, 3, 4], [1]]
3085+
let data = vec![
3086+
Some(vec![
3087+
Some(1),
3088+
Some(3),
3089+
Some(2),
3090+
Some(3),
3091+
Some(1),
3092+
Some(2),
3093+
Some(4),
3094+
]),
3095+
Some(vec![Some(1), Some(1)]),
3096+
];
3097+
let excepted = vec![
3098+
Some(vec![Some(1), Some(2), Some(3), Some(4)]),
3099+
Some(vec![Some(1)]),
3100+
];
3101+
let input =
3102+
Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(data)) as ArrayRef;
3103+
let excepted = ListArray::from_iter_primitive::<Int64Type, _, _>(excepted);
3104+
let result = array_distinct(&[input]).unwrap();
3105+
let result = as_list_array(&result).unwrap();
3106+
assert_eq!(result, &excepted);
3107+
}
30313108
}

datafusion/physical-expr/src/functions.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,9 @@ pub fn create_physical_fun(
347347
BuiltinScalarFunction::ArrayDims => {
348348
Arc::new(|args| make_scalar_function(array_expressions::array_dims)(args))
349349
}
350+
BuiltinScalarFunction::ArrayDistinct => {
351+
Arc::new(|args| make_scalar_function(array_expressions::array_distinct)(args))
352+
}
350353
BuiltinScalarFunction::ArrayElement => {
351354
Arc::new(|args| make_scalar_function(array_expressions::array_element)(args))
352355
}

datafusion/proto/proto/datafusion.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,7 @@ enum ScalarFunction {
642642
ArrayPopFront = 124;
643643
Levenshtein = 125;
644644
SubstrIndex = 126;
645+
ArrayDistinct = 127;
645646
}
646647

647648
message ScalarFunctionNode {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/logical_plan/from_proto.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,15 @@ use datafusion_common::{
4040
DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference, Result, ScalarValue,
4141
};
4242
use datafusion_expr::{
43-
abs, acos, acosh, array, array_append, array_concat, array_dims, array_element,
44-
array_except, array_has, array_has_all, array_has_any, array_intersect, array_length,
45-
array_ndims, array_position, array_positions, array_prepend, array_remove,
46-
array_remove_all, array_remove_n, array_repeat, array_replace, array_replace_all,
47-
array_replace_n, array_slice, array_to_string, arrow_typeof, ascii, asin, asinh,
48-
atan, atan2, atanh, bit_length, btrim, cardinality, cbrt, ceil, character_length,
49-
chr, coalesce, concat_expr, concat_ws_expr, cos, cosh, cot, current_date,
50-
current_time, date_bin, date_part, date_trunc, decode, degrees, digest, encode, exp,
43+
abs, acos, acosh, array, array_append, array_concat, array_dims, array_distinct,
44+
array_element, array_except, array_has, array_has_all, array_has_any,
45+
array_intersect, array_length, array_ndims, array_position, array_positions,
46+
array_prepend, array_remove, array_remove_all, array_remove_n, array_repeat,
47+
array_replace, array_replace_all, array_replace_n, array_slice, array_to_string,
48+
arrow_typeof, ascii, asin, asinh, atan, atan2, atanh, bit_length, btrim, cardinality,
49+
cbrt, ceil, character_length, chr, coalesce, concat_expr, concat_ws_expr, cos, cosh,
50+
cot, current_date, current_time, date_bin, date_part, date_trunc, decode, degrees,
51+
digest, encode, exp,
5152
expr::{self, InList, Sort, WindowFunction},
5253
factorial, flatten, floor, from_unixtime, gcd, gen_range, isnan, iszero, lcm, left,
5354
levenshtein, ln, log, log10, log2,
@@ -470,6 +471,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
470471
ScalarFunction::ArrayHasAny => Self::ArrayHasAny,
471472
ScalarFunction::ArrayHas => Self::ArrayHas,
472473
ScalarFunction::ArrayDims => Self::ArrayDims,
474+
ScalarFunction::ArrayDistinct => Self::ArrayDistinct,
473475
ScalarFunction::ArrayElement => Self::ArrayElement,
474476
ScalarFunction::Flatten => Self::Flatten,
475477
ScalarFunction::ArrayLength => Self::ArrayLength,
@@ -1445,6 +1447,9 @@ pub fn parse_expr(
14451447
ScalarFunction::ArrayDims => {
14461448
Ok(array_dims(parse_expr(&args[0], registry)?))
14471449
}
1450+
ScalarFunction::ArrayDistinct => {
1451+
Ok(array_distinct(parse_expr(&args[0], registry)?))
1452+
}
14481453
ScalarFunction::ArrayElement => Ok(array_element(
14491454
parse_expr(&args[0], registry)?,
14501455
parse_expr(&args[1], registry)?,

datafusion/proto/src/logical_plan/to_proto.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1502,6 +1502,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
15021502
BuiltinScalarFunction::ArrayHasAny => Self::ArrayHasAny,
15031503
BuiltinScalarFunction::ArrayHas => Self::ArrayHas,
15041504
BuiltinScalarFunction::ArrayDims => Self::ArrayDims,
1505+
BuiltinScalarFunction::ArrayDistinct => Self::ArrayDistinct,
15051506
BuiltinScalarFunction::ArrayElement => Self::ArrayElement,
15061507
BuiltinScalarFunction::Flatten => Self::Flatten,
15071508
BuiltinScalarFunction::ArrayLength => Self::ArrayLength,

datafusion/sqllogictest/test_files/array.slt

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,30 @@ AS VALUES
182182
(make_array([[1], [2]], [[2], [3]]), make_array([1], [2]))
183183
;
184184

185+
statement ok
186+
CREATE TABLE array_distinct_table_1D
187+
AS VALUES
188+
(make_array(1, 1, 2, 2, 3)),
189+
(make_array(1, 2, 3, 4, 5)),
190+
(make_array(3, 5, 3, 3, 3))
191+
;
192+
193+
statement ok
194+
CREATE TABLE array_distinct_table_1D_UTF8
195+
AS VALUES
196+
(make_array('a', 'a', 'bc', 'bc', 'def')),
197+
(make_array('a', 'bc', 'def', 'defg', 'defg')),
198+
(make_array('defg', 'defg', 'defg', 'defg', 'defg'))
199+
;
200+
201+
statement ok
202+
CREATE TABLE array_distinct_table_2D
203+
AS VALUES
204+
(make_array([1,2], [1,2], [3,4], [3,4], [5,6])),
205+
(make_array([1,2], [3,4], [5,6], [7,8], [9,10])),
206+
(make_array([5,6], [5,6], NULL))
207+
;
208+
185209
statement ok
186210
CREATE TABLE array_intersect_table_1D
187211
AS VALUES
@@ -2626,6 +2650,45 @@ select array_has_all(make_array(1,2,3), make_array(1,3)),
26262650
----
26272651
true false true false false false true true false false true false true
26282652

2653+
query ?
2654+
select array_distinct(null);
2655+
----
2656+
NULL
2657+
2658+
query ?
2659+
select array_distinct([]);
2660+
----
2661+
[]
2662+
2663+
query ?
2664+
select array_distinct([[], []]);
2665+
----
2666+
[[]]
2667+
2668+
query ?
2669+
select array_distinct(column1)
2670+
from array_distinct_table_1D;
2671+
----
2672+
[1, 2, 3]
2673+
[1, 2, 3, 4, 5]
2674+
[3, 5]
2675+
2676+
query ?
2677+
select array_distinct(column1)
2678+
from array_distinct_table_1D_UTF8;
2679+
----
2680+
[a, bc, def]
2681+
[a, bc, def, defg]
2682+
[defg]
2683+
2684+
query ?
2685+
select array_distinct(column1)
2686+
from array_distinct_table_2D;
2687+
----
2688+
[[1, 2], [3, 4], [5, 6]]
2689+
[[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]]
2690+
[, [5, 6]]
2691+
26292692
query ???
26302693
select array_intersect(column1, column2),
26312694
array_intersect(column3, column4),

0 commit comments

Comments
 (0)