Skip to content

Commit

Permalink
MINOR: Optimizer example and docs, deprecate Expr::name (#3788)
Browse files Browse the repository at this point in the history
* Optimizer example and docs

* more docs

* clippy

* Add notes on expression naming

* debugging

* add display_name and deprecate name

* clippy

* Update datafusion/optimizer/README.md

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Update datafusion/optimizer/README.md

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Update datafusion/optimizer/README.md

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Update datafusion/optimizer/README.md

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* move example

* move example

* toml fmt

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
andygrove and alamb authored Oct 12, 2022
1 parent a226587 commit 4cb8ac0
Show file tree
Hide file tree
Showing 14 changed files with 508 additions and 24 deletions.
5 changes: 5 additions & 0 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@ path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]

[dev-dependencies]
arrow = "24.0.0"
arrow-flight = "24.0.0"
async-trait = "0.1.41"
datafusion = { path = "../datafusion/core" }
datafusion-common = { path = "../datafusion/common" }
datafusion-expr = { path = "../datafusion/expr" }
datafusion-optimizer = { path = "../datafusion/optimizer" }
datafusion-sql = { path = "../datafusion/sql" }
futures = "0.3"
num_cpus = "1.13.0"
object_store = { version = "0.5.0", features = ["aws"] }
Expand Down
163 changes: 163 additions & 0 deletions datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter};
use datafusion_expr::{AggregateUDF, Expr, Filter, LogicalPlan, ScalarUDF, TableSource};
use datafusion_optimizer::optimizer::Optimizer;
use datafusion_optimizer::{utils, OptimizerConfig, OptimizerRule};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use datafusion_sql::sqlparser::dialect::PostgreSqlDialect;
use datafusion_sql::sqlparser::parser::Parser;
use datafusion_sql::TableReference;
use std::any::Any;
use std::sync::Arc;

pub fn main() -> Result<()> {
// produce a logical plan using the datafusion-sql crate
let dialect = PostgreSqlDialect {};
let sql = "SELECT * FROM person WHERE age BETWEEN 21 AND 32";
let statements = Parser::parse_sql(&dialect, sql)?;

// produce a logical plan using the datafusion-sql crate
let context_provider = MyContextProvider {};
let sql_to_rel = SqlToRel::new(&context_provider);
let logical_plan = sql_to_rel.sql_statement_to_plan(statements[0].clone())?;
println!(
"Unoptimized Logical Plan:\n\n{}\n",
logical_plan.display_indent()
);

// now run the optimizer with our custom rule
let optimizer = Optimizer::with_rules(vec![Arc::new(MyRule {})]);
let mut optimizer_config = OptimizerConfig::default().with_skip_failing_rules(false);
let optimized_plan =
optimizer.optimize(&logical_plan, &mut optimizer_config, observe)?;
println!(
"Optimized Logical Plan:\n\n{}\n",
optimized_plan.display_indent()
);

Ok(())
}

fn observe(plan: &LogicalPlan, rule: &dyn OptimizerRule) {
println!(
"After applying rule '{}':\n{}\n",
rule.name(),
plan.display_indent()
)
}

struct MyRule {}

impl OptimizerRule for MyRule {
fn name(&self) -> &str {
"my_rule"
}

fn optimize(
&self,
plan: &LogicalPlan,
_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
// recurse down and optimize children first
let plan = utils::optimize_children(self, plan, _config)?;

match plan {
LogicalPlan::Filter(filter) => {
let mut expr_rewriter = MyExprRewriter {};
let predicate = filter.predicate().clone();
let predicate = predicate.rewrite(&mut expr_rewriter)?;
Ok(LogicalPlan::Filter(Filter::try_new(
predicate,
filter.input().clone(),
)?))
}
_ => Ok(plan.clone()),
}
}
}

struct MyExprRewriter {}

impl ExprRewriter for MyExprRewriter {
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
match expr {
Expr::Between {
negated,
expr,
low,
high,
} => {
let expr: Expr = expr.as_ref().clone();
let low: Expr = low.as_ref().clone();
let high: Expr = high.as_ref().clone();
if negated {
Ok(expr.clone().lt(low).or(expr.gt(high)))
} else {
Ok(expr.clone().gt_eq(low).and(expr.lt_eq(high)))
}
}
_ => Ok(expr.clone()),
}
}
}

struct MyContextProvider {}

impl ContextProvider for MyContextProvider {
fn get_table_provider(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
if name.table() == "person" {
Ok(Arc::new(MyTableSource {
schema: Arc::new(Schema::new(vec![
Field::new("name", DataType::Utf8, false),
Field::new("age", DataType::UInt8, false),
])),
}))
} else {
Err(DataFusionError::Plan("table not found".to_string()))
}
}

fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
None
}

fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
None
}

fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
None
}
}

struct MyTableSource {
schema: SchemaRef,
}

impl TableSource for MyTableSource {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
13 changes: 10 additions & 3 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,17 @@ impl PartialOrd for Expr {
impl Expr {
/// Returns the name of this expression as it should appear in a schema. This name
/// will not include any CAST expressions.
pub fn name(&self) -> Result<String> {
pub fn display_name(&self) -> Result<String> {
create_name(self)
}

/// Returns the name of this expression as it should appear in a schema. This name
/// will not include any CAST expressions.
#[deprecated(since = "14.0.0", note = "please use `display_name` instead")]
pub fn name(&self) -> Result<String> {
self.display_name()
}

/// Returns a full and complete string representation of this expression.
pub fn canonical_name(&self) -> String {
format!("{}", self)
Expand Down Expand Up @@ -1186,7 +1193,7 @@ mod test {
assert_eq!(expected, expr.canonical_name());
assert_eq!(expected, format!("{}", expr));
assert_eq!(expected, format!("{:?}", expr));
assert_eq!(expected, expr.name()?);
assert_eq!(expected, expr.display_name()?);
Ok(())
}

Expand All @@ -1202,7 +1209,7 @@ mod test {
assert_eq!(expected_canonical, format!("{:?}", expr));
// note that CAST intentionally has a name that is different from its `Display`
// representation. CAST does not change the name of expressions.
assert_eq!("Float32(1.23)", expr.name()?);
assert_eq!("Float32(1.23)", expr.display_name()?);
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl ExprSchemable for Expr {
)),
_ => Ok(DFField::new(
None,
&self.name()?,
&self.display_name()?,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
)),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ pub(crate) fn validate_unique_names<'a>(
) -> Result<()> {
let mut unique_names = HashMap::new();
expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
let name = expr.name()?;
let name = expr.display_name()?;
match unique_names.get(&name) {
None => {
unique_names.insert(name, (position, expr));
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ pub fn columnize_expr(e: Expr, input_schema: &DFSchema) -> Expr {
Expr::Alias(Box::new(columnize_expr(*inner_expr, input_schema)), name)
}
Expr::ScalarSubquery(_) => e.clone(),
_ => match e.name() {
_ => match e.display_name() {
Ok(name) => match input_schema.field_with_unqualified_name(&name) {
Ok(field) => Expr::Column(field.qualified_column()),
// expression not provided as input, do not convert to a column reference
Expand Down Expand Up @@ -728,7 +728,7 @@ pub fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) -> Result<Expr> {
let field = plan.schema().field_from_column(col)?;
Ok(Expr::Column(field.qualified_column()))
}
_ => Ok(Expr::Column(Column::from_name(expr.name()?))),
_ => Ok(Expr::Column(Column::from_name(expr.display_name()?))),
}
}

Expand Down
Loading

0 comments on commit 4cb8ac0

Please sign in to comment.