Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 33 additions & 35 deletions datafusion/sql/src/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,44 +36,57 @@ use super::arrow_cast::ARROW_CAST_NAME;
impl<'a, S: ContextProvider> SqlToRel<'a, S> {
pub(super) fn sql_function_to_expr(
&self,
mut function: SQLFunction,
function: SQLFunction,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let name = if function.name.0.len() > 1 {
let SQLFunction {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also changed to use this explicit destructuring, so if other new fields are added the compiler will tell us in the future

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍Good idea

name,
args,
over,
distinct,
filter,
null_treatment,
special: _, // true if not called with trailing parens
order_by,
} = function;

if let Some(null_treatment) = null_treatment {
return not_impl_err!("Null treatment in aggregate functions is not supported: {null_treatment}");
}

let name = if name.0.len() > 1 {
// DF doesn't handle compound identifiers
// (e.g. "foo.bar") for function names yet
function.name.to_string()
name.to_string()
} else {
crate::utils::normalize_ident(function.name.0[0].clone())
crate::utils::normalize_ident(name.0[0].clone())
};

// user-defined function (UDF) should have precedence in case it has the same name as a scalar built-in function
if let Some(fm) = self.context_provider.get_function_meta(&name) {
let args =
self.function_args_to_expr(function.args, schema, planner_context)?;
let args = self.function_args_to_expr(args, schema, planner_context)?;
return Ok(Expr::ScalarUDF(ScalarUDF::new(fm, args)));
}

// next, scalar built-in
if let Ok(fun) = BuiltinScalarFunction::from_str(&name) {
let args =
self.function_args_to_expr(function.args, schema, planner_context)?;
let args = self.function_args_to_expr(args, schema, planner_context)?;
return Ok(Expr::ScalarFunction(ScalarFunction::new(fun, args)));
};

// If function is a window function (it has an OVER clause),
// it shouldn't have ordering requirement as function argument
// required ordering should be defined in OVER clause.
let is_function_window = function.over.is_some();
if !function.order_by.is_empty() && is_function_window {
let is_function_window = over.is_some();
if !order_by.is_empty() && is_function_window {
return plan_err!(
"Aggregate ORDER BY is not implemented for window functions"
);
}

// then, window function
if let Some(WindowType::WindowSpec(window)) = function.over.take() {
if let Some(WindowType::WindowSpec(window)) = over {
let partition_by = window
.partition_by
.into_iter()
Expand All @@ -97,11 +110,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if let Ok(fun) = self.find_window_func(&name) {
let expr = match fun {
WindowFunction::AggregateFunction(aggregate_fun) => {
let args = self.function_args_to_expr(
function.args,
schema,
planner_context,
)?;
let args =
self.function_args_to_expr(args, schema, planner_context)?;

Expr::WindowFunction(expr::WindowFunction::new(
WindowFunction::AggregateFunction(aggregate_fun),
Expand All @@ -113,11 +123,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
_ => Expr::WindowFunction(expr::WindowFunction::new(
fun,
self.function_args_to_expr(
function.args,
schema,
planner_context,
)?,
self.function_args_to_expr(args, schema, planner_context)?,
partition_by,
order_by,
window_frame,
Expand All @@ -128,26 +134,19 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
} else {
// User defined aggregate functions (UDAF) have precedence in case it has the same name as a scalar built-in function
if let Some(fm) = self.context_provider.get_aggregate_meta(&name) {
let args =
self.function_args_to_expr(function.args, schema, planner_context)?;
let args = self.function_args_to_expr(args, schema, planner_context)?;
return Ok(Expr::AggregateUDF(expr::AggregateUDF::new(
fm, args, None, None,
)));
}

// next, aggregate built-ins
if let Ok(fun) = AggregateFunction::from_str(&name) {
let distinct = function.distinct;
let order_by = self.order_by_to_sort_expr(
&function.order_by,
schema,
planner_context,
)?;
let order_by =
self.order_by_to_sort_expr(&order_by, schema, planner_context)?;
let order_by = (!order_by.is_empty()).then_some(order_by);
let args =
self.function_args_to_expr(function.args, schema, planner_context)?;
let filter: Option<Box<Expr>> = function
.filter
let args = self.function_args_to_expr(args, schema, planner_context)?;
let filter: Option<Box<Expr>> = filter
.map(|e| self.sql_expr_to_logical_expr(*e, schema, planner_context))
.transpose()?
.map(Box::new);
Expand All @@ -159,8 +158,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

// Special case arrow_cast (as its type is dependent on its argument value)
if name == ARROW_CAST_NAME {
let args =
self.function_args_to_expr(function.args, schema, planner_context)?;
let args = self.function_args_to_expr(args, schema, planner_context)?;
return super::arrow_cast::create_arrow_cast(args, schema);
}
}
Expand Down
10 changes: 10 additions & 0 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,16 @@ fn select_simple_aggregate_repeated_aggregate_with_unique_aliases() {
);
}

#[test]
fn select_simple_aggregate_respect_nulls() {
let sql = "SELECT MIN(age) RESPECT NULLS FROM person";
let err = logical_plan(sql).expect_err("query should have failed");

assert_contains!(
err.strip_backtrace(),
"This feature is not implemented: Null treatment in aggregate functions is not supported: RESPECT NULLS"
);
}
#[test]
fn select_from_typed_string_values() {
quick_test(
Expand Down