Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stacker and recursive #13310

Merged
merged 8 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ pbjson = { version = "0.7.0" }
prost = "0.13.1"
prost-derive = "0.13.1"
rand = "0.8"
recursive = "0.1.1"
regex = "1.8"
rstest = "0.23.0"
serde_json = "1"
Expand Down
47 changes: 47 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ object_store = { workspace = true, optional = true }
parquet = { workspace = true, optional = true, default-features = true }
paste = "1.0.15"
pyo3 = { version = "0.22.0", optional = true }
recursive = { workspace = true }
sqlparser = { workspace = true }
tokio = { workspace = true }

Expand Down
20 changes: 20 additions & 0 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! [`TreeNode`] for visiting and rewriting expression and plan trees

use recursive::recursive;
use std::sync::Arc;

use crate::Result;
Expand Down Expand Up @@ -123,6 +124,7 @@ pub trait TreeNode: Sized {
/// TreeNodeVisitor::f_up(ChildNode2)
/// TreeNodeVisitor::f_up(ParentNode)
/// ```
#[recursive]
fn visit<'n, V: TreeNodeVisitor<'n, Node = Self>>(
&'n self,
visitor: &mut V,
Expand Down Expand Up @@ -172,6 +174,7 @@ pub trait TreeNode: Sized {
/// TreeNodeRewriter::f_up(ChildNode2)
/// TreeNodeRewriter::f_up(ParentNode)
/// ```
#[recursive]
fn rewrite<R: TreeNodeRewriter<Node = Self>>(
self,
rewriter: &mut R,
Expand All @@ -194,6 +197,7 @@ pub trait TreeNode: Sized {
&'n self,
mut f: F,
) -> Result<TreeNodeRecursion> {
#[recursive]
fn apply_impl<'n, N: TreeNode, F: FnMut(&'n N) -> Result<TreeNodeRecursion>>(
node: &'n N,
f: &mut F,
Expand Down Expand Up @@ -228,6 +232,7 @@ pub trait TreeNode: Sized {
self,
mut f: F,
) -> Result<Transformed<Self>> {
#[recursive]
fn transform_down_impl<N: TreeNode, F: FnMut(N) -> Result<Transformed<N>>>(
node: N,
f: &mut F,
Expand All @@ -251,6 +256,7 @@ pub trait TreeNode: Sized {
self,
mut f: F,
) -> Result<Transformed<Self>> {
#[recursive]
fn transform_up_impl<N: TreeNode, F: FnMut(N) -> Result<Transformed<N>>>(
node: N,
f: &mut F,
Expand Down Expand Up @@ -365,6 +371,7 @@ pub trait TreeNode: Sized {
mut f_down: FD,
mut f_up: FU,
) -> Result<Transformed<Self>> {
#[recursive]
fn transform_down_up_impl<
N: TreeNode,
FD: FnMut(N) -> Result<Transformed<N>>,
Expand Down Expand Up @@ -2079,4 +2086,17 @@ pub(crate) mod tests {

Ok(())
}

#[test]
fn test_large_tree() {
let mut item = TestTreeNode::new_leaf("initial".to_string());
for i in 0..3000 {
item = TestTreeNode::new(vec![item], format!("parent-{}", i));
}

let mut visitor =
TestVisitor::new(Box::new(visit_continue), Box::new(visit_continue));

item.visit(&mut visitor).unwrap();
}
}
1 change: 1 addition & 0 deletions datafusion/expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ datafusion-functions-window-common = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
indexmap = { workspace = true }
paste = "^1.0"
recursive = { workspace = true }
serde_json = { workspace = true }
sqlparser = { workspace = true }
strum = { version = "0.26.1", features = ["derive"] }
Expand Down
7 changes: 7 additions & 0 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::{
LogicalPlan, Partitioning, Projection, RecursiveQuery, Repartition, Sort, Subquery,
SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode, Values, Window,
};
use recursive::recursive;
use std::ops::Deref;
use std::sync::Arc;

Expand Down Expand Up @@ -745,6 +746,7 @@ impl LogicalPlan {

/// Visits a plan similarly to [`Self::visit`], including subqueries that
/// may appear in expressions such as `IN (SELECT ...)`.
#[recursive]
pub fn visit_with_subqueries<V: for<'n> TreeNodeVisitor<'n, Node = Self>>(
&self,
visitor: &mut V,
Expand All @@ -761,6 +763,7 @@ impl LogicalPlan {
/// Similarly to [`Self::rewrite`], rewrites this node and its inputs using `f`,
/// including subqueries that may appear in expressions such as `IN (SELECT
/// ...)`.
#[recursive]
pub fn rewrite_with_subqueries<R: TreeNodeRewriter<Node = Self>>(
self,
rewriter: &mut R,
Expand All @@ -779,6 +782,7 @@ impl LogicalPlan {
&self,
mut f: F,
) -> Result<TreeNodeRecursion> {
#[recursive]
fn apply_with_subqueries_impl<
F: FnMut(&LogicalPlan) -> Result<TreeNodeRecursion>,
>(
Expand Down Expand Up @@ -814,6 +818,7 @@ impl LogicalPlan {
self,
mut f: F,
) -> Result<Transformed<Self>> {
#[recursive]
fn transform_down_with_subqueries_impl<
F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
>(
Expand All @@ -839,6 +844,7 @@ impl LogicalPlan {
self,
mut f: F,
) -> Result<Transformed<Self>> {
#[recursive]
fn transform_up_with_subqueries_impl<
F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
>(
Expand Down Expand Up @@ -866,6 +872,7 @@ impl LogicalPlan {
mut f_down: FD,
mut f_up: FU,
) -> Result<Transformed<Self>> {
#[recursive]
fn transform_down_up_with_subqueries_impl<
FD: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
FU: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ indexmap = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
paste = "1.0.14"
recursive = { workspace = true }
regex = { workspace = true }
regex-syntax = "0.8.0"

Expand Down
2 changes: 2 additions & 0 deletions datafusion/optimizer/src/analyzer/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use crate::analyzer::check_plan;
use crate::utils::collect_subquery_cols;
use recursive::recursive;

use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{plan_err, Result};
Expand Down Expand Up @@ -128,6 +129,7 @@ fn check_correlations_in_subquery(inner_plan: &LogicalPlan) -> Result<()> {
}

// Recursively check the unsupported outer references in the sub query plan.
#[recursive]
fn check_inner_plan(inner_plan: &LogicalPlan, can_contain_outer_ref: bool) -> Result<()> {
if !can_contain_outer_ref && inner_plan.contains_outer_reference() {
return plan_err!("Accessing outer reference columns is not allowed in the plan");
Expand Down
2 changes: 2 additions & 0 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::fmt::Debug;
use std::sync::Arc;

use crate::{OptimizerConfig, OptimizerRule};
use recursive::recursive;

use crate::optimizer::ApplyOrder;
use crate::utils::NamePreserver;
Expand Down Expand Up @@ -531,6 +532,7 @@ impl OptimizerRule for CommonSubexprEliminate {
None
}

#[recursive]
fn rewrite(
&self,
plan: LogicalPlan,
Expand Down
5 changes: 3 additions & 2 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
// under the License.

//! [`EliminateCrossJoin`] converts `CROSS JOIN` to `INNER JOIN` if join predicates are available.
use std::sync::Arc;

use crate::{OptimizerConfig, OptimizerRule};
use recursive::recursive;
use std::sync::Arc;

use crate::join_key_set::JoinKeySet;
use datafusion_common::tree_node::{Transformed, TreeNode};
Expand Down Expand Up @@ -80,6 +80,7 @@ impl OptimizerRule for EliminateCrossJoin {
true
}

#[recursive]
fn rewrite(
&self,
plan: LogicalPlan,
Expand Down
7 changes: 4 additions & 3 deletions datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

mod required_indices;

use std::collections::HashSet;
use std::sync::Arc;

use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use recursive::recursive;
use std::collections::HashSet;
use std::sync::Arc;

use datafusion_common::{
get_required_group_by_exprs_indices, internal_datafusion_err, internal_err, Column,
Expand Down Expand Up @@ -110,6 +110,7 @@ impl OptimizerRule for OptimizeProjections {
/// columns.
/// - `Ok(None)`: Signal that the given logical plan did not require any change.
/// - `Err(error)`: An error occurred during the optimization process.
#[recursive]
fn optimize_projections(
plan: LogicalPlan,
config: &dyn OptimizerConfig,
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ datafusion-expr-common = { workspace = true, default-features = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
itertools = { workspace = true }
recursive = { workspace = true }

[dev-dependencies]
datafusion-functions-aggregate = { workspace = true }
Expand Down
5 changes: 3 additions & 2 deletions datafusion/physical-optimizer/src/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
// under the License.

//! Utilizing exact statistics from sources to avoid scanning data
use std::sync::Arc;

use datafusion_common::config::ConfigOptions;
use datafusion_common::scalar::ScalarValue;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
Expand All @@ -27,6 +25,8 @@ use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::udaf::{AggregateFunctionExpr, StatisticsArgs};
use datafusion_physical_plan::{expressions, ExecutionPlan};
use recursive::recursive;
use std::sync::Arc;

use crate::PhysicalOptimizerRule;

Expand All @@ -42,6 +42,7 @@ impl AggregateStatistics {
}

impl PhysicalOptimizerRule for AggregateStatistics {
#[recursive]
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ datafusion-common = { workspace = true, default-features = true }
datafusion-expr = { workspace = true }
indexmap = { workspace = true }
log = { workspace = true }
recursive = { workspace = true }
regex = { workspace = true }
sqlparser = { workspace = true }
strum = { version = "0.26.1", features = ["derive"] }
Expand Down
Loading