@@ -26,17 +26,18 @@ use crate::utils::replace_qualified_name;
2626use crate :: { OptimizerConfig , OptimizerRule } ;
2727
2828use datafusion_common:: alias:: AliasGenerator ;
29- use datafusion_common:: tree_node:: { TransformedResult , TreeNode } ;
30- use datafusion_common:: { plan_err, Result } ;
29+ use datafusion_common:: tree_node:: { Transformed , TransformedResult , TreeNode } ;
30+ use datafusion_common:: { internal_err , plan_err, Result } ;
3131use datafusion_expr:: expr:: { Exists , InSubquery } ;
3232use datafusion_expr:: expr_rewriter:: create_col_from_scalar_expr;
3333use datafusion_expr:: logical_plan:: { JoinType , Subquery } ;
34- use datafusion_expr:: utils:: { conjunction, split_conjunction} ;
34+ use datafusion_expr:: utils:: { conjunction, split_conjunction, split_conjunction_owned } ;
3535use datafusion_expr:: {
3636 exists, in_subquery, not_exists, not_in_subquery, BinaryExpr , Expr , Filter ,
3737 LogicalPlan , LogicalPlanBuilder , Operator ,
3838} ;
3939
40+ use datafusion_expr:: logical_plan:: tree_node:: unwrap_arc;
4041use log:: debug;
4142
4243/// Optimizer rule for rewriting predicate(IN/EXISTS) subquery to left semi/anti joins
@@ -49,6 +50,16 @@ impl DecorrelatePredicateSubquery {
4950 Self :: default ( )
5051 }
5152
53+ fn rewrite_subquery (
54+ & self ,
55+ mut subquery : Subquery ,
56+ config : & dyn OptimizerConfig ,
57+ ) -> Result < Subquery > {
58+ subquery. subquery =
59+ Arc :: new ( self . rewrite ( unwrap_arc ( subquery. subquery ) , config) ?. data ) ;
60+ Ok ( subquery)
61+ }
62+
5263 /// Finds expressions that have the predicate subqueries (and recurses when found)
5364 ///
5465 /// # Arguments
@@ -59,40 +70,32 @@ impl DecorrelatePredicateSubquery {
5970 /// Returns a tuple (subqueries, non-subquery expressions)
6071 fn extract_subquery_exprs (
6172 & self ,
62- predicate : & Expr ,
73+ predicate : Expr ,
6374 config : & dyn OptimizerConfig ,
6475 ) -> Result < ( Vec < SubqueryInfo > , Vec < Expr > ) > {
65- let filters = split_conjunction ( predicate) ; // TODO: add ExistenceJoin to support disjunctions
76+ let filters = split_conjunction_owned ( predicate) ; // TODO: add ExistenceJoin to support disjunctions
6677
6778 let mut subqueries = vec ! [ ] ;
6879 let mut others = vec ! [ ] ;
69- for it in filters. iter ( ) {
80+ for it in filters. into_iter ( ) {
7081 match it {
7182 Expr :: InSubquery ( InSubquery {
7283 expr,
7384 subquery,
7485 negated,
7586 } ) => {
76- let subquery_plan = self
77- . try_optimize ( & subquery. subquery , config) ?
78- . map ( Arc :: new)
79- . unwrap_or_else ( || subquery. subquery . clone ( ) ) ;
80- let new_subquery = subquery. with_plan ( subquery_plan) ;
87+ let new_subquery = self . rewrite_subquery ( subquery, config) ?;
8188 subqueries. push ( SubqueryInfo :: new_with_in_expr (
8289 new_subquery,
83- ( * * expr) . clone ( ) ,
84- * negated,
90+ * expr,
91+ negated,
8592 ) ) ;
8693 }
8794 Expr :: Exists ( Exists { subquery, negated } ) => {
88- let subquery_plan = self
89- . try_optimize ( & subquery. subquery , config) ?
90- . map ( Arc :: new)
91- . unwrap_or_else ( || subquery. subquery . clone ( ) ) ;
92- let new_subquery = subquery. with_plan ( subquery_plan) ;
93- subqueries. push ( SubqueryInfo :: new ( new_subquery, * negated) ) ;
95+ let new_subquery = self . rewrite_subquery ( subquery, config) ?;
96+ subqueries. push ( SubqueryInfo :: new ( new_subquery, negated) ) ;
9497 }
95- _ => others. push ( ( * it ) . clone ( ) ) ,
98+ expr => others. push ( expr ) ,
9699 }
97100 }
98101
@@ -103,62 +106,85 @@ impl DecorrelatePredicateSubquery {
103106impl OptimizerRule for DecorrelatePredicateSubquery {
104107 fn try_optimize (
105108 & self ,
106- plan : & LogicalPlan ,
107- config : & dyn OptimizerConfig ,
109+ _plan : & LogicalPlan ,
110+ _config : & dyn OptimizerConfig ,
108111 ) -> Result < Option < LogicalPlan > > {
109- match plan {
110- LogicalPlan :: Filter ( filter) => {
111- let ( subqueries, mut other_exprs) =
112- self . extract_subquery_exprs ( & filter. predicate , config) ?;
113- if subqueries. is_empty ( ) {
114- // regular filter, no subquery exists clause here
115- return Ok ( None ) ;
116- }
112+ internal_err ! ( "Should have called DecorrelatePredicateSubquery::rewrite" )
113+ }
117114
118- // iterate through all exists clauses in predicate, turning each into a join
119- let mut cur_input = filter. input . as_ref ( ) . clone ( ) ;
120- for subquery in subqueries {
121- if let Some ( plan) =
122- build_join ( & subquery, & cur_input, config. alias_generator ( ) ) ?
123- {
124- cur_input = plan;
125- } else {
126- // If the subquery can not be converted to a Join, reconstruct the subquery expression and add it to the Filter
127- let sub_query_expr = match subquery {
128- SubqueryInfo {
129- query,
130- where_in_expr : Some ( expr) ,
131- negated : false ,
132- } => in_subquery ( expr, query. subquery . clone ( ) ) ,
133- SubqueryInfo {
134- query,
135- where_in_expr : Some ( expr) ,
136- negated : true ,
137- } => not_in_subquery ( expr, query. subquery . clone ( ) ) ,
138- SubqueryInfo {
139- query,
140- where_in_expr : None ,
141- negated : false ,
142- } => exists ( query. subquery . clone ( ) ) ,
143- SubqueryInfo {
144- query,
145- where_in_expr : None ,
146- negated : true ,
147- } => not_exists ( query. subquery . clone ( ) ) ,
148- } ;
149- other_exprs. push ( sub_query_expr) ;
150- }
151- }
115+ fn supports_rewrite ( & self ) -> bool {
116+ true
117+ }
152118
153- let expr = conjunction ( other_exprs) ;
154- if let Some ( expr) = expr {
155- let new_filter = Filter :: try_new ( expr, Arc :: new ( cur_input) ) ?;
156- cur_input = LogicalPlan :: Filter ( new_filter) ;
157- }
158- Ok ( Some ( cur_input) )
119+ fn rewrite (
120+ & self ,
121+ plan : LogicalPlan ,
122+ config : & dyn OptimizerConfig ,
123+ ) -> Result < Transformed < LogicalPlan > > {
124+ let LogicalPlan :: Filter ( filter) = plan else {
125+ return Ok ( Transformed :: no ( plan) ) ;
126+ } ;
127+
128+ // if there are no subqueries in the predicate, return the original plan
129+ let has_subqueries = split_conjunction ( & filter. predicate )
130+ . iter ( )
131+ . any ( |expr| matches ! ( expr, Expr :: InSubquery ( _) | Expr :: Exists ( _) ) ) ;
132+ if !has_subqueries {
133+ return Ok ( Transformed :: no ( LogicalPlan :: Filter ( filter) ) ) ;
134+ }
135+
136+ let Filter {
137+ predicate, input, ..
138+ } = filter;
139+ let ( subqueries, mut other_exprs) =
140+ self . extract_subquery_exprs ( predicate, config) ?;
141+ if subqueries. is_empty ( ) {
142+ return internal_err ! (
143+ "can not find expected subqueries in DecorrelatePredicateSubquery"
144+ ) ;
145+ }
146+
147+ // iterate through all exists clauses in predicate, turning each into a join
148+ let mut cur_input = unwrap_arc ( input) ;
149+ for subquery in subqueries {
150+ if let Some ( plan) =
151+ build_join ( & subquery, & cur_input, config. alias_generator ( ) ) ?
152+ {
153+ cur_input = plan;
154+ } else {
155+ // If the subquery can not be converted to a Join, reconstruct the subquery expression and add it to the Filter
156+ let sub_query_expr = match subquery {
157+ SubqueryInfo {
158+ query,
159+ where_in_expr : Some ( expr) ,
160+ negated : false ,
161+ } => in_subquery ( expr, query. subquery ) ,
162+ SubqueryInfo {
163+ query,
164+ where_in_expr : Some ( expr) ,
165+ negated : true ,
166+ } => not_in_subquery ( expr, query. subquery ) ,
167+ SubqueryInfo {
168+ query,
169+ where_in_expr : None ,
170+ negated : false ,
171+ } => exists ( query. subquery ) ,
172+ SubqueryInfo {
173+ query,
174+ where_in_expr : None ,
175+ negated : true ,
176+ } => not_exists ( query. subquery ) ,
177+ } ;
178+ other_exprs. push ( sub_query_expr) ;
159179 }
160- _ => Ok ( None ) ,
161180 }
181+
182+ let expr = conjunction ( other_exprs) ;
183+ if let Some ( expr) = expr {
184+ let new_filter = Filter :: try_new ( expr, Arc :: new ( cur_input) ) ?;
185+ cur_input = LogicalPlan :: Filter ( new_filter) ;
186+ }
187+ Ok ( Transformed :: yes ( cur_input) )
162188 }
163189
164190 fn name ( & self ) -> & str {
0 commit comments