Skip to content

Commit

Permalink
feat: Initial support for AnyExpression
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed May 16, 2022
1 parent 716e0bc commit fca30ce
Show file tree
Hide file tree
Showing 18 changed files with 715 additions and 2 deletions.
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ impl ExpressionVisitor for ApplicabilityVisitor<'_> {
| Expr::Cast { .. }
| Expr::TryCast { .. }
| Expr::BinaryExpr { .. }
| Expr::AnyExpr { .. }
| Expr::Between { .. }
| Expr::InList { .. }
| Expr::Exists { .. }
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/logical_plan/expr_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ impl ExprRewritable for Expr {
op,
right: rewrite_boxed(right, rewriter)?,
},
Expr::AnyExpr { left, op, right } => Expr::AnyExpr {
left: rewrite_boxed(left, rewriter)?,
op,
right: rewrite_boxed(right, rewriter)?,
},
Expr::Not(expr) => Expr::Not(rewrite_boxed(expr, rewriter)?),
Expr::IsNotNull(expr) => Expr::IsNotNull(rewrite_boxed(expr, rewriter)?),
Expr::IsNull(expr) => Expr::IsNull(rewrite_boxed(expr, rewriter)?),
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/optimizer/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,10 @@ impl ExprIdentifierVisitor<'_> {
desc.push_str("BinaryExpr-");
desc.push_str(&op.to_string());
}
Expr::AnyExpr { op, .. } => {
desc.push_str("AnyExpr-");
desc.push_str(&op.to_string());
}
Expr::Not(_) => {
desc.push_str("Not-");
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/optimizer/simplify_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ impl<'a> ConstEvaluator<'a> {
Expr::ScalarUDF { fun, .. } => Self::volatility_ok(fun.signature.volatility),
Expr::Literal(_)
| Expr::BinaryExpr { .. }
| Expr::AnyExpr { .. }
| Expr::Not(_)
| Expr::IsNotNull(_)
| Expr::IsNull(_)
Expand Down
8 changes: 8 additions & 0 deletions datafusion/core/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ pub fn expr_sub_expressions(expr: &Expr) -> Result<Vec<Expr>> {
Expr::BinaryExpr { left, right, .. } => {
Ok(vec![left.as_ref().to_owned(), right.as_ref().to_owned()])
}
Expr::AnyExpr { left, right, .. } => {
Ok(vec![left.as_ref().to_owned(), right.as_ref().to_owned()])
}
Expr::IsNull(expr)
| Expr::IsNotNull(expr)
| Expr::Cast { expr, .. }
Expand Down Expand Up @@ -347,6 +350,11 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
op: *op,
right: Box::new(expressions[1].clone()),
}),
Expr::AnyExpr { op, .. } => Ok(Expr::AnyExpr {
left: Box::new(expressions[0].clone()),
op: *op,
right: Box::new(expressions[1].clone()),
}),
Expr::IsNull(_) => Ok(Expr::IsNull(Box::new(expressions[0].clone()))),
Expr::IsNotNull(_) => Ok(Expr::IsNotNull(Box::new(expressions[0].clone()))),
Expr::ScalarFunction { fun, .. } => Ok(Expr::ScalarFunction {
Expand Down
23 changes: 21 additions & 2 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use super::analyze::AnalyzeExec;
use super::{
aggregates, empty::EmptyExec, expressions::binary, functions,
aggregates, empty::EmptyExec, expressions::{binary, any}, functions,
hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows,
};
use crate::execution::context::{ExecutionProps, SessionState};
Expand Down Expand Up @@ -107,6 +107,11 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
let right = create_physical_name(right, false)?;
Ok(format!("{} {:?} {}", left, op, right))
}
Expr::AnyExpr { left, op, right } => {
let left = create_physical_name(left, false)?;
let right = create_physical_name(right, false)?;
Ok(format!("{} {:?} ANY({})", left, op, right))
}
Expr::Case {
expr,
when_then_expr,
Expand Down Expand Up @@ -1113,7 +1118,6 @@ pub fn create_physical_expr(
create_physical_expr(expr, input_dfschema, input_schema, execution_props)?,
key.clone(),
))),

Expr::ScalarFunction { fun, args } => {
let physical_args = args
.iter()
Expand Down Expand Up @@ -1176,6 +1180,21 @@ pub fn create_physical_expr(
binary_expr
}
}
Expr::AnyExpr { left, op, right } => {
let lhs = create_physical_expr(
left,
input_dfschema,
input_schema,
execution_props,
)?;
let rhs = create_physical_expr(
right,
input_dfschema,
input_schema,
execution_props,
)?;
any(lhs, *op, rhs, input_schema)
}
Expr::InList {
expr,
list,
Expand Down
37 changes: 37 additions & 0 deletions datafusion/core/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1467,6 +1467,30 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
}

fn parse_sql_binary_any(
&self,
left: SQLExpr,
op: BinaryOperator,
right: Box<SQLExpr>,
schema: &DFSchema,
ctes: &mut HashMap<String, LogicalPlan>,
) -> Result<Expr> {
let operator = match op {
BinaryOperator::Eq => Ok(Operator::Eq),
BinaryOperator::NotEq => Ok(Operator::NotEq),
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported SQL ANY operator {:?}",
op
))),
}?;

Ok(Expr::AnyExpr {
left: Box::new(self.sql_expr_to_logical_expr(left, schema, ctes)?),
op: operator,
right: Box::new(self.sql_expr_to_logical_expr(*right, schema, ctes)?),
})
}

fn parse_sql_binary_op(
&self,
left: SQLExpr,
Expand All @@ -1475,6 +1499,19 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
schema: &DFSchema,
ctes: &mut HashMap<String, LogicalPlan>,
) -> Result<Expr> {
match right {
SQLExpr::AnyOp(any_expr) => {
return self.parse_sql_binary_any(left, op, any_expr, schema, ctes);
}
SQLExpr::AllOp(_) => {
return Err(DataFusionError::NotImplemented(format!(
"Unsupported SQL ALL operator {:?}",
right
)));
}
_ => {}
};

let operator = match op {
BinaryOperator::Gt => Ok(Operator::Gt),
BinaryOperator::GtEq => Ok(Operator::GtEq),
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/sql/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,11 @@ where
op: *op,
right: Box::new(clone_with_replacement(&**right, replacement_fn)?),
}),
Expr::AnyExpr { left, right, op } => Ok(Expr::AnyExpr {
left: Box::new(clone_with_replacement(&**left, replacement_fn)?),
op: *op,
right: Box::new(clone_with_replacement(&**right, replacement_fn)?),
}),
Expr::Case {
expr: case_expr_opt,
when_then_expr,
Expand Down
24 changes: 24 additions & 0 deletions datafusion/core/tests/sql/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,30 @@ async fn test_extract_date_part() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_binary_any() -> Result<()> {
// =
test_expression!("1 = ANY([1, 2])", "true");
test_expression!("3 = ANY([1, 2])", "false");
test_expression!("NULL = ANY([1, 2])", "NULL");
// utf8
test_expression!("'a' = ANY(['a', 'b'])", "true");
test_expression!("'c' = ANY(['a', 'b'])", "false");
// bool
test_expression!("true = ANY([true, false])", "true");
test_expression!("false = ANY([true, false])", "true");
test_expression!("false = ANY([true, true])", "false");
// <>
test_expression!("3 <> ANY([1, 2])", "true");
test_expression!("1 <> ANY([1, 2])", "false");
test_expression!("2 <> ANY([1, 2])", "false");
test_expression!("NULL = ANY([1, 2])", "NULL");
test_expression!("'c' <> ANY(['a', 'b'])", "true");
test_expression!("'a' <> ANY(['a', 'b'])", "false");

Ok(())
}

#[tokio::test]
async fn test_in_list_scalar() -> Result<()> {
test_expression!("'a' IN ('a','b')", "true");
Expand Down
17 changes: 17 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ pub enum Expr {
/// Right-hand side of the expression
right: Box<Expr>,
},
/// A binary expression such as "age > 21"
AnyExpr {
/// Left-hand side of the expression
left: Box<Expr>,
/// The comparison operator
op: Operator,
/// Right-hand side of the expression
right: Box<Expr>,
},
/// Negation of an expression. The expression's type must be a boolean to make sense.
Not(Box<Expr>),
/// Whether an expression is not Null. This expression is never null.
Expand Down Expand Up @@ -490,6 +499,9 @@ impl fmt::Debug for Expr {
Expr::BinaryExpr { left, op, right } => {
write!(f, "{:?} {} {:?}", left, op, right)
}
Expr::AnyExpr { left, op, right } => {
write!(f, "{:?} {} ANY({:?})", left, op, right)
}
Expr::Sort {
expr,
asc,
Expand Down Expand Up @@ -673,6 +685,11 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
let right = create_name(right, input_schema)?;
Ok(format!("{} {} {}", left, op, right))
}
Expr::AnyExpr { left, op, right } => {
let left = create_name(left, input_schema)?;
let right = create_name(right, input_schema)?;
Ok(format!("{} {} ANY({})", left, op, right))
}
Expr::Case {
expr,
when_then_expr,
Expand Down
6 changes: 6 additions & 0 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ impl ExprSchemable for Expr {
| Expr::InSubquery { .. }
| Expr::Between { .. }
| Expr::InList { .. }
| Expr::AnyExpr { .. }
| Expr::IsNotNull(_) => Ok(DataType::Boolean),
Expr::ScalarSubquery(subquery) => {
Ok(subquery.subquery.schema().field(0).data_type().clone())
Expand Down Expand Up @@ -191,6 +192,11 @@ impl ExprSchemable for Expr {
ref right,
..
} => Ok(left.nullable(input_schema)? || right.nullable(input_schema)?),
Expr::AnyExpr {
ref left,
ref right,
..
} => Ok(left.nullable(input_schema)? || right.nullable(input_schema)?),
Expr::Wildcard => Err(DataFusionError::Internal(
"Wildcard expressions are not valid in a logical query plan".to_owned(),
)),
Expand Down
4 changes: 4 additions & 0 deletions datafusion/expr/src/expr_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ impl ExprVisitable for Expr {
let visitor = left.accept(visitor)?;
right.accept(visitor)
}
Expr::AnyExpr { left, right, .. } => {
let visitor = left.accept(visitor)?;
right.accept(visitor)
}
Expr::Between {
expr, low, high, ..
} => {
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl ExpressionVisitor for ColumnNameVisitor<'_> {
Expr::Alias(_, _)
| Expr::Literal(_)
| Expr::BinaryExpr { .. }
| Expr::AnyExpr { .. }
| Expr::Not(_)
| Expr::IsNotNull(_)
| Expr::IsNull(_)
Expand Down
Loading

0 comments on commit fca30ce

Please sign in to comment.