@@ -25,6 +25,7 @@ use datafusion_common::{
2525 Result ,
2626} ;
2727use datafusion_expr:: { Expr , LogicalPlan , Projection , Sort } ;
28+ use sqlparser:: ast:: Ident ;
2829
2930/// Normalize the schema of a union plan to remove qualifiers from the schema fields and sort expressions.
3031///
@@ -137,14 +138,25 @@ pub(super) fn rewrite_plan_for_sort_on_non_projected_fields(
137138 let inner_exprs = inner_p
138139 . expr
139140 . iter ( )
140- . map ( |f| {
141- if let Expr :: Alias ( alias) = f {
141+ . enumerate ( )
142+ . map ( |( i, f) | match f {
143+ Expr :: Alias ( alias) => {
142144 let a = Expr :: Column ( alias. name . clone ( ) . into ( ) ) ;
143145 map. insert ( a. clone ( ) , f. clone ( ) ) ;
144146 a
145- } else {
147+ }
148+ Expr :: Column ( _) => {
149+ map. insert (
150+ Expr :: Column ( inner_p. schema . field ( i) . name ( ) . into ( ) ) ,
151+ f. clone ( ) ,
152+ ) ;
146153 f. clone ( )
147154 }
155+ _ => {
156+ let a = Expr :: Column ( inner_p. schema . field ( i) . name ( ) . into ( ) ) ;
157+ map. insert ( a. clone ( ) , f. clone ( ) ) ;
158+ a
159+ }
148160 } )
149161 . collect :: < Vec < _ > > ( ) ;
150162
@@ -155,9 +167,17 @@ pub(super) fn rewrite_plan_for_sort_on_non_projected_fields(
155167 }
156168 }
157169
158- if collects. iter ( ) . collect :: < HashSet < _ > > ( )
159- == inner_exprs. iter ( ) . collect :: < HashSet < _ > > ( )
160- {
170+ // Compare outer collects Expr::to_string with inner collected transformed values
171+ // alias -> alias column
172+ // column -> remain
173+ // others, extract schema field name
174+ let outer_collects = collects. iter ( ) . map ( Expr :: to_string) . collect :: < HashSet < _ > > ( ) ;
175+ let inner_collects = inner_exprs
176+ . iter ( )
177+ . map ( Expr :: to_string)
178+ . collect :: < HashSet < _ > > ( ) ;
179+
180+ if outer_collects == inner_collects {
161181 let mut sort = sort. clone ( ) ;
162182 let mut inner_p = inner_p. clone ( ) ;
163183
@@ -175,3 +195,80 @@ pub(super) fn rewrite_plan_for_sort_on_non_projected_fields(
175195 None
176196 }
177197}
198+
199+ // This logic is to work out the columns and inner query for SubqueryAlias plan for both types of
200+ // subquery
201+ // - `(SELECT column_a as a from table) AS A`
202+ // - `(SELECT column_a from table) AS A (a)`
203+ //
204+ // A roundtrip example for table alias with columns
205+ //
206+ // query: SELECT id FROM (SELECT j1_id from j1) AS c (id)
207+ //
208+ // LogicPlan:
209+ // Projection: c.id
210+ // SubqueryAlias: c
211+ // Projection: j1.j1_id AS id
212+ // Projection: j1.j1_id
213+ // TableScan: j1
214+ //
215+ // Before introducing this logic, the unparsed query would be `SELECT c.id FROM (SELECT j1.j1_id AS
216+ // id FROM (SELECT j1.j1_id FROM j1)) AS c`.
217+ // The query is invalid as `j1.j1_id` is not a valid identifier in the derived table
218+ // `(SELECT j1.j1_id FROM j1)`
219+ //
220+ // With this logic, the unparsed query will be:
221+ // `SELECT c.id FROM (SELECT j1.j1_id FROM j1) AS c (id)`
222+ //
223+ // Caveat: this won't handle the case like `select * from (select 1, 2) AS a (b, c)`
224+ // as the parser gives a wrong plan which has mismatch `Int(1)` types: Literal and
225+ // Column in the Projections. Once the parser side is fixed, this logic should work
226+ pub ( super ) fn subquery_alias_inner_query_and_columns (
227+ subquery_alias : & datafusion_expr:: SubqueryAlias ,
228+ ) -> ( & LogicalPlan , Vec < Ident > ) {
229+ let plan: & LogicalPlan = subquery_alias. input . as_ref ( ) ;
230+
231+ let LogicalPlan :: Projection ( outer_projections) = plan else {
232+ return ( plan, vec ! [ ] ) ;
233+ } ;
234+
235+ // check if it's projection inside projection
236+ let Some ( inner_projection) = find_projection ( outer_projections. input . as_ref ( ) ) else {
237+ return ( plan, vec ! [ ] ) ;
238+ } ;
239+
240+ let mut columns: Vec < Ident > = vec ! [ ] ;
241+ // check if the inner projection and outer projection have a matching pattern like
242+ // Projection: j1.j1_id AS id
243+ // Projection: j1.j1_id
244+ for ( i, inner_expr) in inner_projection. expr . iter ( ) . enumerate ( ) {
245+ let Expr :: Alias ( ref outer_alias) = & outer_projections. expr [ i] else {
246+ return ( plan, vec ! [ ] ) ;
247+ } ;
248+
249+ // inner projection schema fields store the projection name which is used in outer
250+ // projection expr
251+ let inner_expr_string = match inner_expr {
252+ Expr :: Column ( _) => inner_expr. to_string ( ) ,
253+ _ => inner_projection. schema . field ( i) . name ( ) . clone ( ) ,
254+ } ;
255+
256+ if outer_alias. expr . to_string ( ) != inner_expr_string {
257+ return ( plan, vec ! [ ] ) ;
258+ } ;
259+
260+ columns. push ( outer_alias. name . as_str ( ) . into ( ) ) ;
261+ }
262+
263+ ( outer_projections. input . as_ref ( ) , columns)
264+ }
265+
266+ fn find_projection ( logical_plan : & LogicalPlan ) -> Option < & Projection > {
267+ match logical_plan {
268+ LogicalPlan :: Projection ( p) => Some ( p) ,
269+ LogicalPlan :: Limit ( p) => find_projection ( p. input . as_ref ( ) ) ,
270+ LogicalPlan :: Distinct ( p) => find_projection ( p. input ( ) . as_ref ( ) ) ,
271+ LogicalPlan :: Sort ( p) => find_projection ( p. input . as_ref ( ) ) ,
272+ _ => None ,
273+ }
274+ }
0 commit comments