Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-testing
Submodule datafusion-testing updated 85 files
+3 −3 data/sqlite/random/expr/slt_good_0.slt
+10 −10 data/sqlite/random/expr/slt_good_10.slt
+5 −3 data/sqlite/random/expr/slt_good_100.slt
+3 −3 data/sqlite/random/expr/slt_good_101.slt
+12 −12 data/sqlite/random/expr/slt_good_102.slt
+7 −7 data/sqlite/random/expr/slt_good_104.slt
+9 −9 data/sqlite/random/expr/slt_good_105.slt
+8 −6 data/sqlite/random/expr/slt_good_106.slt
+3 −3 data/sqlite/random/expr/slt_good_107.slt
+6 −6 data/sqlite/random/expr/slt_good_108.slt
+3 −3 data/sqlite/random/expr/slt_good_11.slt
+6 −6 data/sqlite/random/expr/slt_good_110.slt
+6 −6 data/sqlite/random/expr/slt_good_111.slt
+7 −7 data/sqlite/random/expr/slt_good_112.slt
+6 −6 data/sqlite/random/expr/slt_good_113.slt
+3 −3 data/sqlite/random/expr/slt_good_116.slt
+3 −3 data/sqlite/random/expr/slt_good_118.slt
+4 −4 data/sqlite/random/expr/slt_good_119.slt
+9 −7 data/sqlite/random/expr/slt_good_12.slt
+14 −12 data/sqlite/random/expr/slt_good_13.slt
+6 −6 data/sqlite/random/expr/slt_good_14.slt
+6 −6 data/sqlite/random/expr/slt_good_16.slt
+3 −3 data/sqlite/random/expr/slt_good_17.slt
+3 −3 data/sqlite/random/expr/slt_good_2.slt
+6 −6 data/sqlite/random/expr/slt_good_22.slt
+10 −6 data/sqlite/random/expr/slt_good_23.slt
+3 −3 data/sqlite/random/expr/slt_good_24.slt
+12 −13 data/sqlite/random/expr/slt_good_25.slt
+6 −6 data/sqlite/random/expr/slt_good_26.slt
+6 −6 data/sqlite/random/expr/slt_good_27.slt
+3 −3 data/sqlite/random/expr/slt_good_28.slt
+14 −12 data/sqlite/random/expr/slt_good_29.slt
+3 −3 data/sqlite/random/expr/slt_good_3.slt
+3 −3 data/sqlite/random/expr/slt_good_30.slt
+9 −10 data/sqlite/random/expr/slt_good_31.slt
+9 −9 data/sqlite/random/expr/slt_good_33.slt
+3 −3 data/sqlite/random/expr/slt_good_34.slt
+3 −3 data/sqlite/random/expr/slt_good_36.slt
+3 −3 data/sqlite/random/expr/slt_good_38.slt
+3 −3 data/sqlite/random/expr/slt_good_39.slt
+3 −3 data/sqlite/random/expr/slt_good_4.slt
+3 −3 data/sqlite/random/expr/slt_good_41.slt
+6 −6 data/sqlite/random/expr/slt_good_44.slt
+3 −3 data/sqlite/random/expr/slt_good_45.slt
+3 −3 data/sqlite/random/expr/slt_good_47.slt
+3 −3 data/sqlite/random/expr/slt_good_49.slt
+3 −3 data/sqlite/random/expr/slt_good_5.slt
+3 −3 data/sqlite/random/expr/slt_good_50.slt
+3 −3 data/sqlite/random/expr/slt_good_55.slt
+9 −9 data/sqlite/random/expr/slt_good_56.slt
+5 −3 data/sqlite/random/expr/slt_good_57.slt
+4 −4 data/sqlite/random/expr/slt_good_58.slt
+9 −9 data/sqlite/random/expr/slt_good_6.slt
+3 −3 data/sqlite/random/expr/slt_good_60.slt
+3 −3 data/sqlite/random/expr/slt_good_61.slt
+3 −3 data/sqlite/random/expr/slt_good_62.slt
+6 −6 data/sqlite/random/expr/slt_good_63.slt
+3 −3 data/sqlite/random/expr/slt_good_64.slt
+3 −3 data/sqlite/random/expr/slt_good_66.slt
+14 −12 data/sqlite/random/expr/slt_good_67.slt
+5 −3 data/sqlite/random/expr/slt_good_68.slt
+3 −3 data/sqlite/random/expr/slt_good_69.slt
+3 −3 data/sqlite/random/expr/slt_good_7.slt
+5 −3 data/sqlite/random/expr/slt_good_72.slt
+3 −3 data/sqlite/random/expr/slt_good_75.slt
+5 −3 data/sqlite/random/expr/slt_good_77.slt
+3 −3 data/sqlite/random/expr/slt_good_78.slt
+3 −3 data/sqlite/random/expr/slt_good_79.slt
+3 −3 data/sqlite/random/expr/slt_good_8.slt
+3 −3 data/sqlite/random/expr/slt_good_80.slt
+10 −10 data/sqlite/random/expr/slt_good_81.slt
+10 −10 data/sqlite/random/expr/slt_good_82.slt
+6 −6 data/sqlite/random/expr/slt_good_83.slt
+11 −12 data/sqlite/random/expr/slt_good_84.slt
+7 −7 data/sqlite/random/expr/slt_good_85.slt
+3 −3 data/sqlite/random/expr/slt_good_88.slt
+3 −3 data/sqlite/random/expr/slt_good_89.slt
+6 −9 data/sqlite/random/expr/slt_good_92.slt
+8 −6 data/sqlite/random/expr/slt_good_93.slt
+3 −3 data/sqlite/random/expr/slt_good_94.slt
+3 −3 data/sqlite/random/expr/slt_good_96.slt
+6 −6 data/sqlite/random/expr/slt_good_97.slt
+2 −2 data/sqlite/random/groupby/slt_good_12.slt
+4 −4 data/sqlite/random/groupby/slt_good_4.slt
+5 −5 data/sqlite/random/groupby/slt_good_7.slt
3 changes: 0 additions & 3 deletions datafusion/core/benches/sql_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,6 @@ fn criterion_benchmark(c: &mut Criterion) {
};

let raw_tpcds_sql_queries = (1..100)
// skip query 75 until it is fixed
// https://github.com/apache/datafusion/issues/17801
.filter(|q| *q != 75)
.map(|q| std::fs::read_to_string(format!("{tests_path}tpc-ds/{q}.sql")).unwrap())
.collect::<Vec<_>>();

Expand Down
27 changes: 0 additions & 27 deletions datafusion/core/tests/dataframe/dataframe_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,33 +274,6 @@ async fn test_nvl2() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_nvl2_short_circuit() -> Result<()> {
let expr = nvl2(
col("a"),
arrow_cast(lit("1"), lit("Int32")),
arrow_cast(col("a"), lit("Int32")),
);

let batches = get_batches(expr).await?;

assert_snapshot!(
batches_to_string(&batches),
@r#"
+-----------------------------------------------------------------------------------+
| nvl2(test.a,arrow_cast(Utf8("1"),Utf8("Int32")),arrow_cast(test.a,Utf8("Int32"))) |
+-----------------------------------------------------------------------------------+
| 1 |
| 1 |
| 1 |
| 1 |
+-----------------------------------------------------------------------------------+
"#
);

Ok(())
}
#[tokio::test]
async fn test_fn_arrow_typeof() -> Result<()> {
let expr = arrow_typeof(col("l"));
Expand Down
20 changes: 0 additions & 20 deletions datafusion/core/tests/expr_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,26 +320,6 @@ async fn test_create_physical_expr() {
create_simplified_expr_test(lit(1i32) + lit(2i32), "3");
}

#[test]
fn test_create_physical_expr_nvl2() {
let batch = &TEST_BATCH;
let df_schema = DFSchema::try_from(batch.schema()).unwrap();
let ctx = SessionContext::new();

let expect_err = |expr| {
let physical_expr = ctx.create_physical_expr(expr, &df_schema).unwrap();
let err = physical_expr.evaluate(batch).unwrap_err();
assert!(
err.to_string()
.contains("nvl2 should have been simplified to case"),
"unexpected error: {err:?}"
);
};

expect_err(nvl2(col("i"), lit(1i64), lit(0i64)));
expect_err(nvl2(lit(1i64), col("i"), lit(0i64)));
}

#[tokio::test]
async fn test_create_physical_expr_coercion() {
// create_physical_expr does apply type coercion and unwrapping in cast
Expand Down
11 changes: 7 additions & 4 deletions datafusion/core/tests/tpcds_planning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1051,10 +1051,13 @@ async fn regression_test(query_no: u8, create_physical: bool) -> Result<()> {

for sql in &sql {
let df = ctx.sql(sql).await?;
let (state, plan) = df.into_parts();
let plan = state.optimize(&plan)?;
if create_physical {
let _ = state.create_physical_plan(&plan).await?;
// attempt to mimic planning steps
if !create_physical {
let (state, plan) = df.into_parts();
let _ = state.optimize(&plan)?;
} else {
// this is what df.execute() does internally
let _ = df.create_physical_plan().await?;
}
}

Expand Down
55 changes: 1 addition & 54 deletions datafusion/expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,21 +252,7 @@ impl ScalarUDF {
Ok(result)
}

/// Determines which of the arguments passed to this function are evaluated eagerly
/// and which may be evaluated lazily.
///
/// See [ScalarUDFImpl::conditional_arguments] for more information.
pub fn conditional_arguments<'a>(
&self,
args: &'a [Expr],
) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
self.inner.conditional_arguments(args)
}

/// Returns true if some of this `exprs` subexpressions may not be evaluated
/// and thus any side effects (like divide by zero) may not be encountered.
///
/// See [ScalarUDFImpl::short_circuits] for more information.
/// Get the circuits of inner implementation
pub fn short_circuits(&self) -> bool {
self.inner.short_circuits()
}
Expand Down Expand Up @@ -696,42 +682,10 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync {
///
/// Setting this to true prevents certain optimizations such as common
/// subexpression elimination
///
/// When overriding this function to return `true`, [ScalarUDFImpl::conditional_arguments] can also be
/// overridden to report more accurately which arguments are eagerly evaluated and which ones
/// lazily.
fn short_circuits(&self) -> bool {
false
}

/// Determines which of the arguments passed to this function are evaluated eagerly
/// and which may be evaluated lazily.
///
/// If this function returns `None`, all arguments are eagerly evaluated.
/// Returning `None` is a micro optimization that saves a needless `Vec`
/// allocation.
///
/// If the function returns `Some`, returns (`eager`, `lazy`) where `eager`
/// are the arguments that are always evaluated, and `lazy` are the
/// arguments that may be evaluated lazily (i.e. may not be evaluated at all
/// in some cases).
///
/// Implementations must ensure that the two returned `Vec`s are disjunct,
/// and that each argument from `args` is present in one the two `Vec`s.
///
/// When overriding this function, [ScalarUDFImpl::short_circuits] must
/// be overridden to return `true`.
fn conditional_arguments<'a>(
&self,
args: &'a [Expr],
) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
if self.short_circuits() {
Some((vec![], args.iter().collect()))
} else {
None
}
}

/// Computes the output [`Interval`] for a [`ScalarUDFImpl`], given the input
/// intervals.
///
Expand Down Expand Up @@ -921,13 +875,6 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl {
self.inner.simplify(args, info)
}

fn conditional_arguments<'a>(
&self,
args: &'a [Expr],
) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
self.inner.conditional_arguments(args)
}

fn short_circuits(&self) -> bool {
self.inner.short_circuits()
}
Expand Down
99 changes: 58 additions & 41 deletions datafusion/functions/src/core/coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::{new_null_array, BooleanArray};
use arrow::compute::kernels::zip::zip;
use arrow::compute::{and, is_not_null, is_null};
use arrow::datatypes::{DataType, Field, FieldRef};
use datafusion_common::{exec_err, internal_err, plan_err, Result};
use datafusion_common::{exec_err, internal_err, Result};
use datafusion_expr::binary::try_type_union_resolution;
use datafusion_expr::conditional_expressions::CaseBuilder;
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
use datafusion_expr::{
ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarFunctionArgs,
ColumnarValue, Documentation, ReturnFieldArgs, ScalarFunctionArgs,
};
use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
use datafusion_macros::user_doc;
Expand All @@ -47,7 +48,7 @@ use std::any::Any;
)]
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct CoalesceFunc {
pub(super) signature: Signature,
signature: Signature,
}

impl Default for CoalesceFunc {
Expand Down Expand Up @@ -94,45 +95,61 @@ impl ScalarUDFImpl for CoalesceFunc {
Ok(Field::new(self.name(), return_type, nullable).into())
}

fn simplify(
&self,
args: Vec<Expr>,
_info: &dyn SimplifyInfo,
) -> Result<ExprSimplifyResult> {
/// coalesce evaluates to the first value which is not NULL
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let args = args.args;
// do not accept 0 arguments.
if args.is_empty() {
return plan_err!("coalesce must have at least one argument");
}
if args.len() == 1 {
return Ok(ExprSimplifyResult::Simplified(
args.into_iter().next().unwrap(),
));
return exec_err!(
"coalesce was called with {} arguments. It requires at least 1.",
args.len()
);
}

let n = args.len();
let (init, last_elem) = args.split_at(n - 1);
let whens = init
.iter()
.map(|x| x.clone().is_not_null())
.collect::<Vec<_>>();
let cases = init.to_vec();
Ok(ExprSimplifyResult::Simplified(
CaseBuilder::new(None, whens, cases, Some(Box::new(last_elem[0].clone())))
.end()?,
))
}

/// coalesce evaluates to the first value which is not NULL
fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
internal_err!("coalesce should have been simplified to case")
}

fn conditional_arguments<'a>(
&self,
args: &'a [Expr],
) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
let eager = vec![&args[0]];
let lazy = args[1..].iter().collect();
Some((eager, lazy))
let return_type = args[0].data_type();
let mut return_array = args.iter().filter_map(|x| match x {
ColumnarValue::Array(array) => Some(array.len()),
_ => None,
});

if let Some(size) = return_array.next() {
// start with nulls as default output
let mut current_value = new_null_array(&return_type, size);
let mut remainder = BooleanArray::from(vec![true; size]);

for arg in args {
match arg {
ColumnarValue::Array(ref array) => {
let to_apply = and(&remainder, &is_not_null(array.as_ref())?)?;
current_value = zip(&to_apply, array, &current_value)?;
remainder = and(&remainder, &is_null(array)?)?;
}
ColumnarValue::Scalar(value) => {
if value.is_null() {
continue;
} else {
let last_value = value.to_scalar()?;
current_value = zip(&remainder, &last_value, &current_value)?;
break;
}
}
}
if remainder.iter().all(|x| x == Some(false)) {
break;
}
}
Ok(ColumnarValue::Array(current_value))
} else {
let result = args
.iter()
.filter_map(|x| match x {
ColumnarValue::Scalar(s) if !s.is_null() => Some(x.clone()),
_ => None,
})
.next()
.unwrap_or_else(|| args[0].clone());
Ok(result)
}
}

fn short_circuits(&self) -> bool {
Expand Down
Loading
Loading