Skip to content

[SPARK-28386][SQL] Cannot resolve ORDER BY columns with GROUP BY and HAVING #44352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2690,6 +2690,15 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
}
s.copy(order = newSortOrder, child = newChild)
})

case s @ Sort(_, _, f @ Filter(cond, agg: Aggregate))
if agg.resolved && cond.resolved && s.order.forall(_.resolved) =>
resolveOperatorWithAggregate(s.order.map(_.child), agg, (newExprs, newChild) => {
val newSortOrder = s.order.zip(newExprs).map {
case (sortOrder, expr) => sortOrder.copy(child = expr)
}
s.copy(order = newSortOrder, child = f.copy(child = newChild))
})
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan, Project, Sort}
import org.apache.spark.sql.connector.catalog.CatalogManager

/**
Expand All @@ -28,10 +28,11 @@ import org.apache.spark.sql.connector.catalog.CatalogManager
* includes metadata columns as well.
* 2. Resolves the column to a literal function which is allowed to be invoked without braces, e.g.
* `SELECT col, current_date FROM t`.
* 3. If the child plan is Aggregate, resolves the column to [[TempResolvedColumn]] with the output
* of Aggregate's child plan. This is to allow Sort to host grouping expressions and aggregate
* functions, which can be pushed down to the Aggregate later. For example,
* `SELECT max(a) FROM t GROUP BY b ORDER BY min(a)`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we obtain the example?

* 3. If the child plan is Aggregate or Filter(_, Aggregate), resolves the column to
* [[TempResolvedColumn]] with the output of Aggregate's child plan.
* This is to allow Sort to host grouping expressions and aggregate functions, which can
* be pushed down to the Aggregate later. For example,
* `SELECT max(a) FROM t GROUP BY b HAVING max(a) > 1 ORDER BY min(a)`.
* 4. Resolves the column to [[AttributeReference]] with the output of a descendant plan node.
* Spark will propagate the missing attributes from the descendant plan node to the Sort node.
* This is to allow users to ORDER BY columns that are not in the SELECT clause, which is
Expand All @@ -51,7 +52,10 @@ class ResolveReferencesInSort(val catalogManager: CatalogManager)

def apply(s: Sort): LogicalPlan = {
val resolvedBasic = s.order.map(resolveExpressionByPlanOutput(_, s.child))
val resolvedWithAgg = resolvedBasic.map(resolveColWithAgg(_, s.child))
val resolvedWithAgg = s.child match {
case Filter(_, agg: Aggregate) => resolvedBasic.map(resolveColWithAgg(_, agg))
case _ => resolvedBasic.map(resolveColWithAgg(_, s.child))
}
val (missingAttrResolved, newChild) = resolveExprsAndAddMissingAttrs(resolvedWithAgg, s.child)
val orderByAllResolved = resolveOrderByAll(
s.global, newChild, missingAttrResolved.map(_.asInstanceOf[SortOrder]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,32 @@ Filter (c1#x = 1)
+- Aggregate [c1#x], [c1#x]
+- SubqueryAlias t
+- LocalRelation [c1#x, c2#x]


-- !query
SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY sum(v)
-- !query analysis
Sort [sum(v)#xL ASC NULLS FIRST], true
+- Filter (sum(v)#xL > cast(2 as bigint))
+- Aggregate [k#x], [k#x, sum(v#x) AS sum(v)#xL]
+- SubqueryAlias hav
+- View (`hav`, [k#x,v#x])
+- Project [cast(k#x as string) AS k#x, cast(v#x as int) AS v#x]
+- Project [k#x, v#x]
+- SubqueryAlias hav
+- LocalRelation [k#x, v#x]


-- !query
SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY avg(v)
-- !query analysis
Project [k#x, sum(v)#xL]
+- Sort [avg(v#x)#x ASC NULLS FIRST], true
+- Filter (sum(v)#xL > cast(2 as bigint))
+- Aggregate [k#x], [k#x, sum(v#x) AS sum(v)#xL, avg(v#x) AS avg(v#x)#x]
+- SubqueryAlias hav
+- View (`hav`, [k#x,v#x])
+- Project [cast(k#x as string) AS k#x, cast(v#x as int) AS v#x]
+- Project [k#x, v#x]
+- SubqueryAlias hav
+- LocalRelation [k#x, v#x]
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,11 @@ Project [udf(b)#x, udf(c)#x]
SELECT udf(b), udf(c) FROM test_having
GROUP BY b, c HAVING udf(b) = 3 ORDER BY udf(b), udf(c)
-- !query analysis
Project [udf(b)#x, udf(c)#x]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know why the plan is changed?

Copy link
Member Author

@pan3793 pan3793 Dec 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the previous resolution matches item 4 of ResolveReferencesInSort comments.

  1. Resolves the column to [[AttributeReference]] with the output of a descendant plan node.
    Spark will propagate the missing attributes from the descendant plan node to the Sort node.
    This is to allow users to ORDER BY columns that are not in the SELECT clause, which is
    widely supported in other SQL dialects. For example, SELECT a FROM t ORDER BY b.

With this patch, it should match item 3

  1. If the child plan is Aggregate or Filter(_, Aggregate), resolves the column to
    [[TempResolvedColumn]] with the output of Aggregate's child plan.
    This is to allow Sort to host grouping expressions and aggregate functions, which can
    be pushed down to the Aggregate later. For example,
    SELECT max(a) FROM t GROUP BY b HAVING max(a) > 1 ORDER BY min(a).

Copy link
Contributor

@cloud-fan cloud-fan Dec 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh so the plan is actually more efficient now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think so, the plan shows it eliminates some unnecessary column propagation across operators

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is an analyzed plan, the optimized plan should be same with pr ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both Analyzed plan and Optimized plan are changed :)

before:

== Analyzed Logical Plan ==
udf(b): int, udf(c): double
Project [udf(b)#24, udf(c)#25]
+- Sort [udf(b#21) ASC NULLS FIRST, udf(cast(c#22 as double)) ASC NULLS FIRST], true
   +- Filter (udf(b)#24 = 3)
      +- Aggregate [b#21, c#22], [udf(b#21) AS udf(b)#24, udf(cast(c#22 as double)) AS udf(c)#25, b#21, c#22]
         +- SubqueryAlias spark_catalog.default.test_having
            +- Relation spark_catalog.default.test_having[a#20,b#21,c#22,d#23] parquet

== Optimized Logical Plan ==
Project [udf(b)#24, udf(c)#25]
+- Sort [udf(b#21) ASC NULLS FIRST, udf(cast(c#22 as double)) ASC NULLS FIRST], true
   +- Aggregate [b#21, c#22], [udf(b#21) AS udf(b)#24, udf(cast(c#22 as double)) AS udf(c)#25, b#21, c#22]
      +- Project [b#21, c#22]
         +- Filter (isnotnull(b#21) AND (udf(b#21) = 3))
            +- Relation spark_catalog.default.test_having[a#20,b#21,c#22,d#23] parquet

after:

== Analyzed Logical Plan ==
udf(b): int, udf(c): double
Sort [udf(b)#9 ASC NULLS FIRST, udf(c)#10 ASC NULLS FIRST], true
+- Filter (udf(b)#9 = 3)
   +- Aggregate [b#6, c#7], [udf(b#6) AS udf(b)#9, udf(cast(c#7 as double)) AS udf(c)#10]
      +- SubqueryAlias spark_catalog.default.test_having
         +- Relation spark_catalog.default.test_having[a#5,b#6,c#7,d#8] parquet

== Optimized Logical Plan ==
Sort [udf(b)#9 ASC NULLS FIRST, udf(c)#10 ASC NULLS FIRST], true
+- Aggregate [b#6, c#7], [udf(b#6) AS udf(b)#9, udf(cast(c#7 as double)) AS udf(c)#10]
   +- Project [b#6, c#7]
      +- Filter (isnotnull(b#6) AND (udf(b#6) = 3))
         +- Relation spark_catalog.default.test_having[a#5,b#6,c#7,d#8] parquet

+- Sort [cast(udf(cast(b#x as string)) as int) ASC NULLS FIRST, cast(udf(cast(c#x as string)) as string) ASC NULLS FIRST], true
+- Filter (udf(b)#x = 3)
+- Aggregate [b#x, c#x], [cast(udf(cast(b#x as string)) as int) AS udf(b)#x, cast(udf(cast(c#x as string)) as string) AS udf(c)#x, b#x, c#x]
+- SubqueryAlias spark_catalog.default.test_having
+- Relation spark_catalog.default.test_having[a#x,b#x,c#x,d#x] parquet
Sort [udf(b)#x ASC NULLS FIRST, udf(c)#x ASC NULLS FIRST], true
+- Filter (udf(b)#x = 3)
+- Aggregate [b#x, c#x], [cast(udf(cast(b#x as string)) as int) AS udf(b)#x, cast(udf(cast(c#x as string)) as string) AS udf(c)#x]
+- SubqueryAlias spark_catalog.default.test_having
+- Relation spark_catalog.default.test_having[a#x,b#x,c#x,d#x] parquet


-- !query
Expand Down
6 changes: 6 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/having.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,9 @@ SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY GROUPING SETS(t.c1) HAVING t.
SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY CUBE(t.c1) HAVING t.c1 = 1;
SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY ROLLUP(t.c1) HAVING t.c1 = 1;
SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY t.c1 HAVING t.c1 = 1;

-- SPARK-28386: Resolve ORDER BY agg function with HAVING clause, while the agg function presents on SELECT list
SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY sum(v);

-- SPARK-28386: Resolve ORDER BY agg function with HAVING clause, while the agg function does not present on SELECT list
SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY avg(v);
18 changes: 18 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/having.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,21 @@ SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY t.c1 HAVING t.c1 = 1
struct<c1:int>
-- !query output
1


-- !query
SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY sum(v)
-- !query schema
struct<k:string,sum(v):bigint>
-- !query output
three 3
one 6


-- !query
SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY avg(v)
-- !query schema
struct<k:string,sum(v):bigint>
-- !query output
one 6
three 3
Original file line number Diff line number Diff line change
Expand Up @@ -248,15 +248,15 @@ Input [2]: [ca_state#18, count#22]
Keys [1]: [ca_state#18]
Functions [1]: [count(1)]
Aggregate Attributes [1]: [count(1)#23]
Results [3]: [ca_state#18 AS state#24, count(1)#23 AS cnt#25, ca_state#18]
Results [2]: [ca_state#18 AS state#24, count(1)#23 AS cnt#25]

(44) Filter [codegen id : 14]
Input [3]: [state#24, cnt#25, ca_state#18]
Input [2]: [state#24, cnt#25]
Condition : (cnt#25 >= 10)

(45) TakeOrderedAndProject
Input [3]: [state#24, cnt#25, ca_state#18]
Arguments: 100, [cnt#25 ASC NULLS FIRST, ca_state#18 ASC NULLS FIRST], [state#24, cnt#25]
Input [2]: [state#24, cnt#25]
Arguments: 100, [cnt#25 ASC NULLS FIRST, state#24 ASC NULLS FIRST], [state#24, cnt#25]

===== Subqueries =====

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
TakeOrderedAndProject [cnt,ca_state,state]
TakeOrderedAndProject [cnt,state]
WholeStageCodegen (14)
Filter [cnt]
HashAggregate [ca_state,count] [count(1),state,cnt,count]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,15 +218,15 @@ Input [2]: [ca_state#2, count#22]
Keys [1]: [ca_state#2]
Functions [1]: [count(1)]
Aggregate Attributes [1]: [count(1)#23]
Results [3]: [ca_state#2 AS state#24, count(1)#23 AS cnt#25, ca_state#2]
Results [2]: [ca_state#2 AS state#24, count(1)#23 AS cnt#25]

(38) Filter [codegen id : 8]
Input [3]: [state#24, cnt#25, ca_state#2]
Input [2]: [state#24, cnt#25]
Condition : (cnt#25 >= 10)

(39) TakeOrderedAndProject
Input [3]: [state#24, cnt#25, ca_state#2]
Arguments: 100, [cnt#25 ASC NULLS FIRST, ca_state#2 ASC NULLS FIRST], [state#24, cnt#25]
Input [2]: [state#24, cnt#25]
Arguments: 100, [cnt#25 ASC NULLS FIRST, state#24 ASC NULLS FIRST], [state#24, cnt#25]

===== Subqueries =====

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
TakeOrderedAndProject [cnt,ca_state,state]
TakeOrderedAndProject [cnt,state]
WholeStageCodegen (8)
Filter [cnt]
HashAggregate [ca_state,count] [count(1),state,cnt,count]
Expand Down