Skip to content

Commit

Permalink
handle any expression in ballista
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed May 16, 2022
1 parent 9caaf3b commit d5dfa12
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 3 deletions.
8 changes: 8 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ message PhysicalExprNode {
PhysicalWindowExprNode window_expr = 15;

PhysicalScalarUdfNode scalar_udf = 16;

PhysicalAnyExprNode any_expr = 17;
}
}

Expand Down Expand Up @@ -374,6 +376,12 @@ message PhysicalBinaryExprNode {
string op = 3;
}

message PhysicalAnyExprNode {
PhysicalExprNode l = 1;
PhysicalExprNode r = 2;
string op = 3;
}

message PhysicalSortExprNode {
PhysicalExprNode expr = 1;
bool asc = 2;
Expand Down
10 changes: 8 additions & 2 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ use datafusion::logical_expr::window_function::WindowFunction;

use datafusion::physical_plan::{
expressions::{
BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
Literal, NegativeExpr, NotExpr, TryCastExpr, DEFAULT_DATAFUSION_CAST_OPTIONS,
AnyExpr, BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr,
IsNullExpr, Literal, NegativeExpr, NotExpr, TryCastExpr,
DEFAULT_DATAFUSION_CAST_OPTIONS,
},
functions::{self, ScalarFunctionExpr},
Partitioning,
Expand Down Expand Up @@ -78,6 +79,11 @@ pub(crate) fn parse_physical_expr(
from_proto_binary_op(&binary_expr.op)?,
parse_required_physical_box_expr(&binary_expr.r, registry, "right")?,
)),
ExprType::AnyExpr(binary_expr) => Arc::new(AnyExpr::new(
parse_required_physical_box_expr(&binary_expr.l, registry, "left")?,
from_proto_binary_op(&binary_expr.op)?,
parse_required_physical_box_expr(&binary_expr.r, registry, "right")?,
)),
ExprType::AggregateExpr(_) => {
return Err(BallistaError::General(
"Cannot convert aggregate expr node to physical expression".to_owned(),
Expand Down
16 changes: 15 additions & 1 deletion ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ use datafusion::physical_plan::file_format::FileScanConfig;

use datafusion::physical_plan::expressions::{Count, Literal};

use datafusion::physical_plan::expressions::{Avg, BinaryExpr, Column, Max, Min, Sum};
use datafusion::physical_plan::expressions::{
AnyExpr, Avg, BinaryExpr, Column, Max, Min, Sum,
};
use datafusion::physical_plan::{AggregateExpr, PhysicalExpr};

use crate::serde::{protobuf, BallistaError};
Expand Down Expand Up @@ -185,6 +187,18 @@ impl TryFrom<Arc<dyn PhysicalExpr>> for protobuf::PhysicalExprNode {
binary_expr,
)),
})
} else if let Some(expr) = expr.downcast_ref::<AnyExpr>() {
let binary_expr = Box::new(protobuf::PhysicalAnyExprNode {
l: Some(Box::new(expr.left().to_owned().try_into()?)),
r: Some(Box::new(expr.right().to_owned().try_into()?)),
op: format!("{:?}", expr.op()),
});

Ok(protobuf::PhysicalExprNode {
expr_type: Some(protobuf::physical_expr_node::ExprType::AnyExpr(
binary_expr,
)),
})
} else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
Ok(protobuf::PhysicalExprNode {
expr_type: Some(
Expand Down
15 changes: 15 additions & 0 deletions datafusion/physical-expr/src/expressions/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,21 @@ impl AnyExpr {
}
}

/// Get the left side of the binary expression
pub fn left(&self) -> &Arc<dyn PhysicalExpr> {
&self.value
}

/// Get the right side of the binary expression
pub fn right(&self) -> &Arc<dyn PhysicalExpr> {
&self.list
}

/// Get the operator for this binary expression
pub fn op(&self) -> &Operator {
&self.op
}

/// Compare for specific utf8 types
fn compare_bool(
&self,
Expand Down

0 comments on commit d5dfa12

Please sign in to comment.