Skip to content

'Rename array() function to make_array(), extend array[] #3122

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions datafusion/core/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ pub use datafusion_common::{
Column, DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema,
};
pub use datafusion_expr::{
abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, asin, atan,
atan2, avg, bit_length, btrim, call_fn, case, cast, ceil, character_length, chr,
coalesce, col, combine_filters, concat, concat_expr, concat_ws, concat_ws_expr, cos,
count, count_distinct, create_udaf, create_udf, date_part, date_trunc, digest,
exists, exp, expr_rewriter,
abs, acos, and, approx_distinct, approx_percentile_cont, ascii, asin, atan, atan2,
avg, bit_length, btrim, call_fn, case, cast, ceil, character_length, chr, coalesce,
col, combine_filters, concat, concat_expr, concat_ws, concat_ws_expr, cos, count,
count_distinct, create_udaf, create_udf, date_part, date_trunc, digest, exists, exp,
expr_rewriter,
expr_rewriter::{
normalize_col, normalize_col_with_schemas, normalize_cols, replace_col,
rewrite_sort_cols_by_aggs, unnormalize_col, unnormalize_cols, ExprRewritable,
Expand All @@ -50,11 +50,11 @@ pub use datafusion_expr::{
StringifiedPlan, Subquery, TableScan, ToStringifiedPlan, Union,
UserDefinedLogicalNode, Values,
},
lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, now_expr,
nullif, octet_length, or, power, random, regexp_match, regexp_replace, repeat,
replace, reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256, sha384,
sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex,
to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim,
trunc, unalias, upper, when, Expr, ExprSchemable, Literal, Operator,
lower, lpad, ltrim, make_array, max, md5, min, not_exists, not_in_subquery, now,
now_expr, nullif, octet_length, or, power, random, regexp_match, regexp_replace,
repeat, replace, reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256,
sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan,
to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate,
trim, trunc, unalias, upper, when, Expr, ExprSchemable, Literal, Operator,
};
pub use datafusion_optimizer::expr_simplifier::{ExprSimplifiable, SimplifyInfo};
8 changes: 4 additions & 4 deletions datafusion/core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ pub use crate::execution::options::{
AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions,
};
pub use crate::logical_plan::{
approx_percentile_cont, array, ascii, avg, bit_length, btrim, cast, character_length,
chr, coalesce, col, concat, concat_ws, count, create_udf, date_part, date_trunc,
digest, exists, from_unixtime, in_list, in_subquery, initcap, left, length, lit,
lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, octet_length,
approx_percentile_cont, ascii, avg, bit_length, btrim, cast, character_length, chr,
coalesce, col, concat, concat_ws, count, create_udf, date_part, date_trunc, digest,
exists, from_unixtime, in_list, in_subquery, initcap, left, length, lit, lower, lpad,
ltrim, make_array, max, md5, min, not_exists, not_in_subquery, now, octet_length,
random, regexp_match, regexp_replace, repeat, replace, reverse, right, rpad, rtrim,
scalar_subquery, sha224, sha256, sha384, sha512, split_part, starts_with, strpos,
substr, sum, to_hex, translate, trim, upper, Column, Expr, JoinType, Partitioning,
Expand Down
113 changes: 90 additions & 23 deletions datafusion/core/tests/sql/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ async fn query_concat() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn query_array() -> Result<()> {
// Return a session context with table "test" registered with 2 columns
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this file illustrate the change in behavior that this PR makes

fn array_context() -> SessionContext {
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Utf8, false),
Field::new("c2", DataType::Int32, true),
Expand All @@ -124,43 +124,110 @@ async fn query_array() -> Result<()> {
Arc::new(StringArray::from_slice(&["", "a", "aa", "aaa"])),
Arc::new(Int32Array::from(vec![Some(0), Some(1), None, Some(3)])),
],
)?;
)
.unwrap();

let table = MemTable::try_new(schema, vec![vec![data]])?;
let table = MemTable::try_new(schema, vec![vec![data]]).unwrap();

let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(table))?;
let sql = "SELECT array(c1, cast(c2 as varchar)) FROM test";
ctx.register_table("test", Arc::new(table)).unwrap();
ctx
}

#[tokio::test]
async fn query_array() {
let ctx = array_context();
let sql = "SELECT array[c1, cast(c2 as varchar)] FROM test";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prior to this PR, this query would error (only constants were supported in array[] syntax. cc @ovr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+--------------------------------------+",
"| array(test.c1,CAST(test.c2 AS Utf8)) |",
"+--------------------------------------+",
"| [, 0] |",
"| [a, 1] |",
"| [aa, ] |",
"| [aaa, 3] |",
"+--------------------------------------+",
"+----------+",
"| array |",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using array for the column name is consistent with postgres

"+----------+",
"| [, 0] |",
"| [a, 1] |",
"| [aa, ] |",
"| [aaa, 3] |",
"+----------+",
];
assert_batches_eq!(expected, &actual);
}

#[tokio::test]
async fn query_make_array() {
let ctx = array_context();
let sql = "SELECT make_array(c1, cast(c2 as varchar)) FROM test";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+------------------------------------------+",
"| makearray(test.c1,CAST(test.c2 AS Utf8)) |",
"+------------------------------------------+",
"| [, 0] |",
"| [a, 1] |",
"| [aa, ] |",
"| [aaa, 3] |",
"+------------------------------------------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
}

#[tokio::test]
async fn query_array_scalar() -> Result<()> {
async fn query_array_scalar() {
let ctx = SessionContext::new();

let sql = "SELECT array(1, 2, 3);";
let sql = "SELECT array[1, 2, 3];";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+-----------------------------------+",
"| array(Int64(1),Int64(2),Int64(3)) |",
"+-----------------------------------+",
"| [1, 2, 3] |",
"+-----------------------------------+",
"+-----------+",
"| array |",
"+-----------+",
"| [1, 2, 3] |",
"+-----------+",
];
assert_batches_eq!(expected, &actual);

// alternate syntax format
let sql = "SELECT [1, 2, 3];";
let actual = execute_to_batches(&ctx, sql).await;
assert_batches_eq!(expected, &actual);
}

#[tokio::test]
async fn query_array_scalar_bad_types() {
let ctx = SessionContext::new();

// no common type to coerce to, should error
let err = plan_and_collect(&ctx, "SELECT [1, true, null]")
.await
.unwrap_err();
assert_eq!(err.to_string(), "Error during planning: Coercion from [Int64, Boolean, Null] to the signature VariadicEqual failed.",);
}

#[tokio::test]
async fn query_array_scalar_coerce() {
let ctx = SessionContext::new();

// The planner should be able to coerce this to all integers
// https://github.com/apache/arrow-datafusion/issues/3170
let err = plan_and_collect(&ctx, "SELECT [1, 2, '3']")
.await
.unwrap_err();
assert_eq!(err.to_string(), "Error during planning: Coercion from [Int64, Int64, Utf8] to the signature VariadicEqual failed.",);
}

#[tokio::test]
async fn query_make_array_scalar() {
let ctx = SessionContext::new();

let sql = "SELECT make_array(1, 2, 3);";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the equivalent of SELECT array(1,2, 3) on master

let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+---------------------------------------+",
"| makearray(Int64(1),Int64(2),Int64(3)) |",
"+---------------------------------------+",
"| [1, 2, 3] |",
"+---------------------------------------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
}

#[tokio::test]
Expand Down
12 changes: 8 additions & 4 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,11 @@ pub enum BuiltinScalarFunction {
/// trunc
Trunc,

// string functions
// array functions
/// construct an array from columns
Array,
MakeArray,

// string functions
/// ascii
Ascii,
/// bit_length
Expand Down Expand Up @@ -204,7 +206,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Sqrt => Volatility::Immutable,
BuiltinScalarFunction::Tan => Volatility::Immutable,
BuiltinScalarFunction::Trunc => Volatility::Immutable,
BuiltinScalarFunction::Array => Volatility::Immutable,
BuiltinScalarFunction::MakeArray => Volatility::Immutable,
BuiltinScalarFunction::Ascii => Volatility::Immutable,
BuiltinScalarFunction::BitLength => Volatility::Immutable,
BuiltinScalarFunction::Btrim => Volatility::Immutable,
Expand Down Expand Up @@ -297,8 +299,10 @@ impl FromStr for BuiltinScalarFunction {
// conditional functions
"coalesce" => BuiltinScalarFunction::Coalesce,

// array functions
"make_array" => BuiltinScalarFunction::MakeArray,

// string functions
"array" => BuiltinScalarFunction::Array,
"ascii" => BuiltinScalarFunction::Ascii,
"bit_length" => BuiltinScalarFunction::BitLength,
"btrim" => BuiltinScalarFunction::Btrim,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,9 @@ scalar_expr!(FromUnixtime, from_unixtime, unixtime);
unary_scalar_expr!(ArrowTypeof, arrow_typeof, "data type");

/// Returns an array of fixed size with each argument on it.
pub fn array(args: Vec<Expr>) -> Expr {
pub fn make_array(args: Vec<Expr>) -> Expr {
Expr::ScalarFunction {
fun: built_in_function::BuiltinScalarFunction::Array,
fun: built_in_function::BuiltinScalarFunction::MakeArray,
args,
}
}
Expand Down
12 changes: 4 additions & 8 deletions datafusion/expr/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use crate::nullif::SUPPORTED_NULLIF_TYPES;
use crate::type_coercion::data_types;
use crate::ColumnarValue;
use crate::{
array_expressions, conditional_expressions, struct_expressions, Accumulator,
BuiltinScalarFunction, Signature, TypeSignature,
conditional_expressions, struct_expressions, Accumulator, BuiltinScalarFunction,
Signature, TypeSignature,
};
use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
use datafusion_common::{DataFusionError, Result};
Expand Down Expand Up @@ -96,7 +96,7 @@ pub fn return_type(
// the return type of the built in function.
// Some built-in functions' return type depends on the incoming type.
match fun {
BuiltinScalarFunction::Array => Ok(DataType::FixedSizeList(
BuiltinScalarFunction::MakeArray => Ok(DataType::FixedSizeList(
Box::new(Field::new("item", input_expr_types[0].clone(), true)),
input_expr_types.len() as i32,
)),
Expand Down Expand Up @@ -267,12 +267,8 @@ pub fn return_type(
pub fn signature(fun: &BuiltinScalarFunction) -> Signature {
// note: the physical expression must accept the type returned by this function or the execution panics.

// for now, the list is small, as we do not have many built-in functions.
match fun {
BuiltinScalarFunction::Array => Signature::variadic(
array_expressions::SUPPORTED_ARRAY_TYPES.to_vec(),
fun.volatility(),
),
BuiltinScalarFunction::MakeArray => Signature::variadic_equal(fun.volatility()),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires all arguments to make_array be the same type

BuiltinScalarFunction::Struct => Signature::variadic(
struct_expressions::SUPPORTED_STRUCT_TYPES.to_vec(),
fun.volatility(),
Expand Down
41 changes: 24 additions & 17 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ macro_rules! downcast_vec {
}};
}

macro_rules! array {
/// Create an array of FixedSizeList from a set of individual Arrays
/// where each element in the output FixedSizeList is the result of
/// concatenating the corresponding values in the input Arrays
macro_rules! make_fixed_size_list {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed this to be more descriptive of what it does (there are too may arrays in DataFusion already ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I simply renamed this macro to something that shows more of what it is doing

($ARGS:expr, $ARRAY_TYPE:ident, $BUILDER_TYPE:ident) => {{
// downcast all arguments to their common format
let args =
Expand All @@ -59,7 +62,7 @@ macro_rules! array {
}};
}

fn array_array(args: &[ArrayRef]) -> Result<ArrayRef> {
fn arrays_to_fixed_size_list_array(args: &[ArrayRef]) -> Result<ArrayRef> {
// do not accept 0 arguments.
if args.is_empty() {
return Err(DataFusionError::Internal(
Expand All @@ -68,19 +71,21 @@ fn array_array(args: &[ArrayRef]) -> Result<ArrayRef> {
}

let res = match args[0].data_type() {
DataType::Utf8 => array!(args, StringArray, StringBuilder),
DataType::LargeUtf8 => array!(args, LargeStringArray, LargeStringBuilder),
DataType::Boolean => array!(args, BooleanArray, BooleanBuilder),
DataType::Float32 => array!(args, Float32Array, Float32Builder),
DataType::Float64 => array!(args, Float64Array, Float64Builder),
DataType::Int8 => array!(args, Int8Array, Int8Builder),
DataType::Int16 => array!(args, Int16Array, Int16Builder),
DataType::Int32 => array!(args, Int32Array, Int32Builder),
DataType::Int64 => array!(args, Int64Array, Int64Builder),
DataType::UInt8 => array!(args, UInt8Array, UInt8Builder),
DataType::UInt16 => array!(args, UInt16Array, UInt16Builder),
DataType::UInt32 => array!(args, UInt32Array, UInt32Builder),
DataType::UInt64 => array!(args, UInt64Array, UInt64Builder),
DataType::Utf8 => make_fixed_size_list!(args, StringArray, StringBuilder),
DataType::LargeUtf8 => {
make_fixed_size_list!(args, LargeStringArray, LargeStringBuilder)
}
DataType::Boolean => make_fixed_size_list!(args, BooleanArray, BooleanBuilder),
DataType::Float32 => make_fixed_size_list!(args, Float32Array, Float32Builder),
DataType::Float64 => make_fixed_size_list!(args, Float64Array, Float64Builder),
DataType::Int8 => make_fixed_size_list!(args, Int8Array, Int8Builder),
DataType::Int16 => make_fixed_size_list!(args, Int16Array, Int16Builder),
DataType::Int32 => make_fixed_size_list!(args, Int32Array, Int32Builder),
DataType::Int64 => make_fixed_size_list!(args, Int64Array, Int64Builder),
DataType::UInt8 => make_fixed_size_list!(args, UInt8Array, UInt8Builder),
DataType::UInt16 => make_fixed_size_list!(args, UInt16Array, UInt16Builder),
DataType::UInt32 => make_fixed_size_list!(args, UInt32Array, UInt32Builder),
DataType::UInt64 => make_fixed_size_list!(args, UInt64Array, UInt64Builder),
data_type => {
return Err(DataFusionError::NotImplemented(format!(
"Array is not implemented for type '{:?}'.",
Expand All @@ -92,13 +97,15 @@ fn array_array(args: &[ArrayRef]) -> Result<ArrayRef> {
}

/// put values in an array.
pub fn array(values: &[ColumnarValue]) -> Result<ColumnarValue> {
pub fn make_array(values: &[ColumnarValue]) -> Result<ColumnarValue> {
let arrays: Vec<ArrayRef> = values
.iter()
.map(|x| match x {
ColumnarValue::Array(array) => array.clone(),
ColumnarValue::Scalar(scalar) => scalar.to_array().clone(),
})
.collect();
Ok(ColumnarValue::Array(array_array(arrays.as_slice())?))
Ok(ColumnarValue::Array(arrays_to_fixed_size_list_array(
arrays.as_slice(),
)?))
}
Loading