Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions datafusion/optimizer/src/decorrelate_predicate_subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use crate::{OptimizerConfig, OptimizerRule};

use datafusion_common::alias::AliasGenerator;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{internal_err, plan_err, Column, Result};
use datafusion_common::{
assert_or_internal_err, plan_err, Column, DataFusionError, Result,
};
use datafusion_expr::expr::{Exists, InSubquery};
use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
use datafusion_expr::logical_plan::{JoinType, Subquery};
Expand Down Expand Up @@ -79,11 +81,10 @@ impl OptimizerRule for DecorrelatePredicateSubquery {
.into_iter()
.partition(has_subquery);

if with_subqueries.is_empty() {
return internal_err!(
"can not find expected subqueries in DecorrelatePredicateSubquery"
);
}
assert_or_internal_err!(
!with_subqueries.is_empty(),
"can not find expected subqueries in DecorrelatePredicateSubquery"
);

// iterate through all exists clauses in predicate, turning each into a join
let mut cur_input = Arc::unwrap_or_clone(filter.input);
Expand Down
15 changes: 7 additions & 8 deletions datafusion/optimizer/src/extract_equijoin_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::tree_node::Transformed;
use datafusion_common::{internal_err, DFSchema};
use datafusion_common::{assert_or_internal_err, DFSchema, DataFusionError};
use datafusion_common::{NullEquality, Result};
use datafusion_expr::utils::split_conjunction_owned;
use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair};
Expand Down Expand Up @@ -223,13 +223,12 @@ fn split_op_and_other_join_predicates(
right_schema: &DFSchema,
operator: Operator,
) -> Result<(Vec<EquijoinPredicate>, Option<Expr>)> {
if !matches!(operator, Operator::Eq | Operator::IsNotDistinctFrom) {
return internal_err!(
"split_op_and_other_join_predicates only supports 'Eq' or 'IsNotDistinctFrom' operators, \
but received: {:?}",
operator
);
}
assert_or_internal_err!(
matches!(operator, Operator::Eq | Operator::IsNotDistinctFrom),
"split_op_and_other_join_predicates only supports 'Eq' or 'IsNotDistinctFrom' operators, \
but received: {:?}",
operator
);

let exprs = split_conjunction_owned(filter);

Expand Down
28 changes: 16 additions & 12 deletions datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ use std::collections::HashSet;
use std::sync::Arc;

use datafusion_common::{
get_required_group_by_exprs_indices, internal_datafusion_err, internal_err, Column,
DFSchema, HashMap, JoinType, Result,
assert_eq_or_internal_err, get_required_group_by_exprs_indices,
internal_datafusion_err, internal_err, Column, DFSchema, DataFusionError, HashMap,
JoinType, Result,
};
use datafusion_expr::expr::Alias;
use datafusion_expr::{
Expand Down Expand Up @@ -341,11 +342,14 @@ fn optimize_projections(
return Ok(Transformed::no(plan));
};
let children = extension.node.inputs();
if children.len() != necessary_children_indices.len() {
return internal_err!("Inconsistent length between children and necessary children indices. \
Make sure `.necessary_children_exprs` implementation of the `UserDefinedLogicalNode` is \
consistent with actual children length for the node.");
}
assert_eq_or_internal_err!(
children.len(),
necessary_children_indices.len(),
"Inconsistent length between children and necessary children indices. \
Make sure `.necessary_children_exprs` implementation of the \
`UserDefinedLogicalNode` is consistent with actual children length \
for the node."
);
children
.into_iter()
.zip(necessary_children_indices)
Expand Down Expand Up @@ -432,11 +436,11 @@ fn optimize_projections(
// Required indices are currently ordered (child0, child1, ...)
// but the loop pops off the last element, so we need to reverse the order
child_required_indices.reverse();
if child_required_indices.len() != plan.inputs().len() {
return internal_err!(
"OptimizeProjection: child_required_indices length mismatch with plan inputs"
);
}
assert_eq_or_internal_err!(
child_required_indices.len(),
plan.inputs().len(),
"OptimizeProjection: child_required_indices length mismatch with plan inputs"
);

// Rewrite children of the plan
let transformed_plan = plan.map_children(|child| {
Expand Down
16 changes: 9 additions & 7 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
use datafusion_common::{
internal_err, plan_err, qualified_name, Column, DFSchema, Result,
assert_eq_or_internal_err, internal_err, plan_err, qualified_name, Column, DFSchema,
DataFusionError, Result,
};
use datafusion_expr::expr::WindowFunction;
use datafusion_expr::expr_rewriter::replace_col;
Expand Down Expand Up @@ -1135,12 +1136,13 @@ impl OptimizerRule for PushDownFilter {
let supported_filters = scan
.source
.supports_filters_pushdown(non_volatile_filters.as_slice())?;
if non_volatile_filters.len() != supported_filters.len() {
return internal_err!(
"Vec returned length: {} from supports_filters_pushdown is not the same size as the filters passed, which length is: {}",
supported_filters.len(),
non_volatile_filters.len());
}
assert_eq_or_internal_err!(
non_volatile_filters.len(),
supported_filters.len(),
"Vec returned length: {} from supports_filters_pushdown is not the same size as the filters passed, which length is: {}",
supported_filters.len(),
non_volatile_filters.len()
);

// Compose scan filters from non-volatile filters of `Exact` or `Inexact` pushdown type
let zip = non_volatile_filters.into_iter().zip(supported_filters);
Expand Down
18 changes: 11 additions & 7 deletions datafusion/optimizer/src/scalar_subquery_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use datafusion_common::alias::AliasGenerator;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter,
};
use datafusion_common::{internal_err, plan_err, Column, Result, ScalarValue};
use datafusion_common::{
assert_or_internal_err, plan_err, Column, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
use datafusion_expr::logical_plan::{JoinType, Subquery};
use datafusion_expr::utils::conjunction;
Expand Down Expand Up @@ -94,9 +96,10 @@ impl OptimizerRule for ScalarSubqueryToJoin {
config.alias_generator(),
)?;

if subqueries.is_empty() {
return internal_err!("Expected subqueries not found in filter");
}
assert_or_internal_err!(
!subqueries.is_empty(),
"Expected subqueries not found in filter"
);

// iterate through all subqueries in predicate, turning each into a left join
let mut cur_input = filter.input.as_ref().clone();
Expand Down Expand Up @@ -154,9 +157,10 @@ impl OptimizerRule for ScalarSubqueryToJoin {
all_subqueries.extend(subqueries);
expr_to_rewrite_expr_map.insert(expr, rewrite_exprs);
}
if all_subqueries.is_empty() {
return internal_err!("Expected subqueries not found in projection");
}
assert_or_internal_err!(
!all_subqueries.is_empty(),
"Expected subqueries not found in projection"
);
// iterate through all subqueries in predicate, turning each into a left join
let mut cur_input = projection.input.as_ref().clone();
for (subquery, alias) in all_subqueries {
Expand Down
18 changes: 12 additions & 6 deletions datafusion/optimizer/src/single_distinct_to_groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};

use datafusion_common::{
internal_err, tree_node::Transformed, DataFusionError, HashSet, Result,
assert_eq_or_internal_err, tree_node::Transformed, DataFusionError, HashSet, Result,
};
use datafusion_expr::builder::project;
use datafusion_expr::expr::AggregateFunctionParams;
Expand Down Expand Up @@ -183,15 +183,21 @@ impl OptimizerRule for SingleDistinctToGroupBy {
.map(|aggr_expr| match aggr_expr {
Expr::AggregateFunction(AggregateFunction {
func,
params: AggregateFunctionParams { mut args, distinct, .. }
params:
AggregateFunctionParams {
mut args, distinct, ..
},
}) => {
if distinct {
if args.len() != 1 {
return internal_err!("DISTINCT aggregate should have exactly one argument");
}
assert_eq_or_internal_err!(
args.len(),
1,
"DISTINCT aggregate should have exactly one argument"
);
let arg = args.swap_remove(0);

if group_fields_set.insert(arg.schema_name().to_string()) {
if group_fields_set.insert(arg.schema_name().to_string())
{
inner_group_exprs
.push(arg.alias(SINGLE_DISTINCT_ALIAS));
}
Expand Down