Skip to content

Commit

Permalink
Support inlining view / dataframes logical plan (apache#3923)
Browse files Browse the repository at this point in the history
* Inline TableScans for views and dataframes

* Inline TableScans for views and dataframes

* Inline TableScans for views and dataframes

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* fmt

* doc

* Fix test

* Simplify

* Fix

* Rename test source

* Use plan instead of projected schema

* Docs

* Use SubqueryAlias

* Revert "Use SubqueryAlias"

This reverts commit 207c2a9.

* WIP

* Fix issue

* Clippy

* Format
  • Loading branch information
Dandandan committed Nov 5, 2022
1 parent 4f49b9c commit 0534b67
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 6 deletions.
14 changes: 12 additions & 2 deletions benchmarks/expected-plans/q15.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,18 @@ Sort: supplier.s_suppkey ASC NULLS LAST
Inner Join: revenue0.total_revenue = __sq_1.__value
Inner Join: supplier.s_suppkey = revenue0.supplier_no
TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone]
TableScan: revenue0 projection=[supplier_no, total_revenue]
Projection: supplier_no, total_revenue, alias=revenue0
Projection: lineitem.l_suppkey AS supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue
Projection: lineitem.l_suppkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)
Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")
TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]
Projection: MAX(revenue0.total_revenue) AS __value, alias=__sq_1
Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]]
TableScan: revenue0 projection=[total_revenue]
Projection: total_revenue, alias=revenue0
Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue
Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)
Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")
TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]
EmptyRelation
12 changes: 10 additions & 2 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,10 @@ impl TableProvider for DataFrame {
self
}

fn get_logical_plan(&self) -> Option<&LogicalPlan> {
Some(&self.plan)
}

fn supports_filter_pushdown(
&self,
_filter: &Expr,
Expand Down Expand Up @@ -1337,8 +1341,12 @@ mod tests {
\n Limit: skip=0, fetch=1\
\n Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST, fetch=1\
\n Inner Join: t1.c1 = t2.c1\
\n TableScan: t1 projection=[c1, c2, c3]\
\n TableScan: t2 projection=[c1, c2, c3]",
\n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t1\
\n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]\
\n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t2\
\n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]",
format!("{:?}", df_renamed.to_logical_plan()?)
);

Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/datasource/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::any::Any;
use std::sync::Arc;

use async_trait::async_trait;
use datafusion_expr::LogicalPlan;
pub use datafusion_expr::{TableProviderFilterPushDown, TableType};

use crate::arrow::datatypes::SchemaRef;
Expand All @@ -47,6 +48,11 @@ pub trait TableProvider: Sync + Send {
None
}

/// Get the Logical Plan of this table, if available.
fn get_logical_plan(&self) -> Option<&LogicalPlan> {
None
}

/// Create an ExecutionPlan that will scan the table.
/// The table provider will be usually responsible of grouping
/// the source data into partitions that can be efficiently
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/default_table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ impl TableSource for DefaultTableSource {
) -> datafusion_common::Result<TableProviderFilterPushDown> {
self.table_provider.supports_filter_pushdown(filter)
}

fn get_logical_plan(&self) -> Option<&datafusion_expr::LogicalPlan> {
self.table_provider.get_logical_plan()
}
}

/// Wrap TableProvider in TableSource
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ impl TableProvider for ViewTable {
self
}

fn get_logical_plan(&self) -> Option<&LogicalPlan> {
Some(&self.logical_plan)
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.table_schema)
}
Expand Down
7 changes: 6 additions & 1 deletion datafusion/expr/src/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::Expr;
use crate::{Expr, LogicalPlan};
use arrow::datatypes::SchemaRef;
use std::any::Any;

Expand Down Expand Up @@ -76,4 +76,9 @@ pub trait TableSource: Sync + Send {
) -> datafusion_common::Result<TableProviderFilterPushDown> {
Ok(TableProviderFilterPushDown::Unsupported)
}

/// Get the Logical plan of this table provider, if available.
fn get_logical_plan(&self) -> Option<&LogicalPlan> {
None
}
}
180 changes: 180 additions & 0 deletions datafusion/optimizer/src/inline_table_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Optimizer rule to replace TableScan references
//! such as DataFrames and Views and inlines the LogicalPlan
//! to support further optimization
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::{
logical_plan::LogicalPlan, utils::from_plan, Expr, LogicalPlanBuilder, TableScan,
};

/// Optimization rule that inlines TableScan that provide a [LogicalPlan]
/// ([DataFrame] / [ViewTable])
#[derive(Default)]
pub struct InlineTableScan;

impl InlineTableScan {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}

/// Inline
fn inline_table_scan(plan: &LogicalPlan) -> Result<LogicalPlan> {
match plan {
// Match only on scans without filter / projection / fetch
// Views and DataFrames won't have those added
// during the early stage of planning
LogicalPlan::TableScan(TableScan {
source,
table_name,
filters,
fetch: None,
..
}) if filters.is_empty() => {
if let Some(sub_plan) = source.get_logical_plan() {
// Recursively apply optimization
let plan = inline_table_scan(sub_plan)?;
let plan = LogicalPlanBuilder::from(plan).project_with_alias(
vec![Expr::Wildcard],
Some(table_name.to_string()),
)?;
plan.build()
} else {
// No plan available, return with table scan as is
Ok(plan.clone())
}
}

// Rest: Recurse
_ => {
// apply the optimization to all inputs of the plan
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|plan| inline_table_scan(plan))
.collect::<Result<Vec<_>>>()?;

from_plan(plan, &plan.expressions(), &new_inputs)
}
}
}

impl OptimizerRule for InlineTableScan {
fn optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
inline_table_scan(plan)
}

fn name(&self) -> &str {
"inline_table_scan"
}
}

#[cfg(test)]
mod tests {
use std::{sync::Arc, vec};

use arrow::datatypes::{DataType, Field, Schema};
use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, TableSource};

use crate::{inline_table_scan::InlineTableScan, OptimizerConfig, OptimizerRule};

pub struct RawTableSource {}

impl TableSource for RawTableSource {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn schema(&self) -> arrow::datatypes::SchemaRef {
Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]))
}

fn supports_filter_pushdown(
&self,
_filter: &datafusion_expr::Expr,
) -> datafusion_common::Result<datafusion_expr::TableProviderFilterPushDown>
{
Ok(datafusion_expr::TableProviderFilterPushDown::Inexact)
}
}

pub struct CustomSource {
plan: LogicalPlan,
}
impl CustomSource {
fn new() -> Self {
Self {
plan: LogicalPlanBuilder::scan("y", Arc::new(RawTableSource {}), None)
.unwrap()
.build()
.unwrap(),
}
}
}
impl TableSource for CustomSource {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn supports_filter_pushdown(
&self,
_filter: &datafusion_expr::Expr,
) -> datafusion_common::Result<datafusion_expr::TableProviderFilterPushDown>
{
Ok(datafusion_expr::TableProviderFilterPushDown::Exact)
}

fn schema(&self) -> arrow::datatypes::SchemaRef {
Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]))
}

fn get_logical_plan(&self) -> Option<&LogicalPlan> {
Some(&self.plan)
}
}

#[test]
fn inline_table_scan() {
let rule = InlineTableScan::new();

let source = Arc::new(CustomSource::new());

let scan = LogicalPlanBuilder::scan("x".to_string(), source, None).unwrap();

let plan = scan.filter(col("x.a").eq(lit(1))).unwrap().build().unwrap();

let optimized_plan = rule
.optimize(&plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
let expected = "\
Filter: x.a = Int32(1)\
\n Projection: y.a, alias=x\
\n TableScan: y";

assert_eq!(formatted_plan, expected);
assert_eq!(plan.schema(), optimized_plan.schema());
}
}
1 change: 1 addition & 0 deletions datafusion/optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub mod eliminate_limit;
pub mod expr_simplifier;
pub mod filter_null_join_keys;
pub mod filter_push_down;
pub mod inline_table_scan;
pub mod limit_push_down;
pub mod optimizer;
pub mod projection_push_down;
Expand Down
2 changes: 2 additions & 0 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::eliminate_filter::EliminateFilter;
use crate::eliminate_limit::EliminateLimit;
use crate::filter_null_join_keys::FilterNullJoinKeys;
use crate::filter_push_down::FilterPushDown;
use crate::inline_table_scan::InlineTableScan;
use crate::limit_push_down::LimitPushDown;
use crate::projection_push_down::ProjectionPushDown;
use crate::reduce_cross_join::ReduceCrossJoin;
Expand Down Expand Up @@ -148,6 +149,7 @@ impl Optimizer {
/// Create a new optimizer using the recommended list of rules
pub fn new(config: &OptimizerConfig) -> Self {
let mut rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
Arc::new(InlineTableScan::new()),
Arc::new(TypeCoercion::new()),
Arc::new(SimplifyExpressions::new()),
Arc::new(UnwrapCastInComparison::new()),
Expand Down
4 changes: 3 additions & 1 deletion datafusion/optimizer/src/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,9 @@ fn optimize_plan(
}

fn projection_equal(p: &Projection, p2: &Projection) -> bool {
p.expr.len() == p2.expr.len() && p.expr.iter().zip(&p2.expr).all(|(l, r)| l == r)
p.expr.len() == p2.expr.len()
&& p.alias == p2.alias
&& p.expr.iter().zip(&p2.expr).all(|(l, r)| l == r)
}

#[cfg(test)]
Expand Down

0 comments on commit 0534b67

Please sign in to comment.