Skip to content

Commit 32b63ff

Browse files
authored
Stop copying LogicalPlan and Exprs in ScalarSubqueryToJoin (#10489)
1 parent 87169f0 commit 32b63ff

File tree

1 file changed

+54
-29
lines changed

1 file changed

+54
-29
lines changed

datafusion/optimizer/src/scalar_subquery_to_join.rs

Lines changed: 54 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use datafusion_common::alias::AliasGenerator;
2929
use datafusion_common::tree_node::{
3030
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter,
3131
};
32-
use datafusion_common::{plan_err, Column, Result, ScalarValue};
32+
use datafusion_common::{internal_err, plan_err, Column, Result, ScalarValue};
3333
use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
3434
use datafusion_expr::logical_plan::{JoinType, Subquery};
3535
use datafusion_expr::utils::conjunction;
@@ -50,7 +50,7 @@ impl ScalarSubqueryToJoin {
5050
/// # Arguments
5151
/// * `predicate` - A conjunction to split and search
5252
///
53-
/// Returns a tuple (subqueries, rewrite expression)
53+
/// Returns a tuple (subqueries, alias)
5454
fn extract_subquery_exprs(
5555
&self,
5656
predicate: &Expr,
@@ -71,19 +71,36 @@ impl ScalarSubqueryToJoin {
7171
impl OptimizerRule for ScalarSubqueryToJoin {
7272
fn try_optimize(
7373
&self,
74-
plan: &LogicalPlan,
75-
config: &dyn OptimizerConfig,
74+
_plan: &LogicalPlan,
75+
_config: &dyn OptimizerConfig,
7676
) -> Result<Option<LogicalPlan>> {
77+
internal_err!("Should have called ScalarSubqueryToJoin::rewrite")
78+
}
79+
80+
fn supports_rewrite(&self) -> bool {
81+
true
82+
}
83+
84+
fn rewrite(
85+
&self,
86+
plan: LogicalPlan,
87+
config: &dyn OptimizerConfig,
88+
) -> Result<Transformed<LogicalPlan>> {
7789
match plan {
7890
LogicalPlan::Filter(filter) => {
91+
// Optimization: skip the rest of the rule and its copies if
92+
// there are no scalar subqueries
93+
if !contains_scalar_subquery(&filter.predicate) {
94+
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
95+
}
96+
7997
let (subqueries, mut rewrite_expr) = self.extract_subquery_exprs(
8098
&filter.predicate,
8199
config.alias_generator(),
82100
)?;
83101

84102
if subqueries.is_empty() {
85-
// regular filter, no subquery exists clause here
86-
return Ok(None);
103+
return internal_err!("Expected subqueries not found in filter");
87104
}
88105

89106
// iterate through all subqueries in predicate, turning each into a left join
@@ -94,16 +111,13 @@ impl OptimizerRule for ScalarSubqueryToJoin {
94111
{
95112
if !expr_check_map.is_empty() {
96113
rewrite_expr = rewrite_expr
97-
.clone()
98114
.transform_up(|expr| {
99-
if let Expr::Column(col) = &expr {
100-
if let Some(map_expr) =
101-
expr_check_map.get(&col.name)
102-
{
103-
Ok(Transformed::yes(map_expr.clone()))
104-
} else {
105-
Ok(Transformed::no(expr))
106-
}
115+
// replace column references with entry in map, if it exists
116+
if let Some(map_expr) = expr
117+
.try_as_col()
118+
.and_then(|col| expr_check_map.get(&col.name))
119+
{
120+
Ok(Transformed::yes(map_expr.clone()))
107121
} else {
108122
Ok(Transformed::no(expr))
109123
}
@@ -113,15 +127,21 @@ impl OptimizerRule for ScalarSubqueryToJoin {
113127
cur_input = optimized_subquery;
114128
} else {
115129
// if we can't handle all of the subqueries then bail for now
116-
return Ok(None);
130+
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
117131
}
118132
}
119133
let new_plan = LogicalPlanBuilder::from(cur_input)
120134
.filter(rewrite_expr)?
121135
.build()?;
122-
Ok(Some(new_plan))
136+
Ok(Transformed::yes(new_plan))
123137
}
124138
LogicalPlan::Projection(projection) => {
139+
// Optimization: skip the rest of the rule and its copies if
140+
// there are no scalar subqueries
141+
if !projection.expr.iter().any(contains_scalar_subquery) {
142+
return Ok(Transformed::no(LogicalPlan::Projection(projection)));
143+
}
144+
125145
let mut all_subqueryies = vec![];
126146
let mut expr_to_rewrite_expr_map = HashMap::new();
127147
let mut subquery_to_expr_map = HashMap::new();
@@ -135,8 +155,7 @@ impl OptimizerRule for ScalarSubqueryToJoin {
135155
expr_to_rewrite_expr_map.insert(expr, rewrite_exprs);
136156
}
137157
if all_subqueryies.is_empty() {
138-
// regular projection, no subquery exists clause here
139-
return Ok(None);
158+
return internal_err!("Expected subqueries not found in projection");
140159
}
141160
// iterate through all subqueries in predicate, turning each into a left join
142161
let mut cur_input = projection.input.as_ref().clone();
@@ -153,14 +172,13 @@ impl OptimizerRule for ScalarSubqueryToJoin {
153172
let new_expr = rewrite_expr
154173
.clone()
155174
.transform_up(|expr| {
156-
if let Expr::Column(col) = &expr {
157-
if let Some(map_expr) =
175+
// replace column references with entry in map, if it exists
176+
if let Some(map_expr) =
177+
expr.try_as_col().and_then(|col| {
158178
expr_check_map.get(&col.name)
159-
{
160-
Ok(Transformed::yes(map_expr.clone()))
161-
} else {
162-
Ok(Transformed::no(expr))
163-
}
179+
})
180+
{
181+
Ok(Transformed::yes(map_expr.clone()))
164182
} else {
165183
Ok(Transformed::no(expr))
166184
}
@@ -172,7 +190,7 @@ impl OptimizerRule for ScalarSubqueryToJoin {
172190
}
173191
} else {
174192
// if we can't handle all of the subqueries then bail for now
175-
return Ok(None);
193+
return Ok(Transformed::no(LogicalPlan::Projection(projection)));
176194
}
177195
}
178196

@@ -190,10 +208,10 @@ impl OptimizerRule for ScalarSubqueryToJoin {
190208
let new_plan = LogicalPlanBuilder::from(cur_input)
191209
.project(proj_exprs)?
192210
.build()?;
193-
Ok(Some(new_plan))
211+
Ok(Transformed::yes(new_plan))
194212
}
195213

196-
_ => Ok(None),
214+
plan => Ok(Transformed::no(plan)),
197215
}
198216
}
199217

@@ -206,6 +224,13 @@ impl OptimizerRule for ScalarSubqueryToJoin {
206224
}
207225
}
208226

227+
/// Returns true if the expression has a scalar subquery somewhere in it
228+
/// false otherwise
229+
fn contains_scalar_subquery(expr: &Expr) -> bool {
230+
expr.exists(|expr| Ok(matches!(expr, Expr::ScalarSubquery(_))))
231+
.expect("Inner is always Ok")
232+
}
233+
209234
struct ExtractScalarSubQuery {
210235
sub_query_info: Vec<(Subquery, String)>,
211236
alias_gen: Arc<AliasGenerator>,

0 commit comments

Comments
 (0)