Skip to content

Commit 4718d59

Browse files
amaliujiacloud-fan
authored andcommitted
[SPARK-39548][SQL] CreateView Command with a window clause query hit a wrong window definition not found issue
### What changes were proposed in this pull request? 1. In the inline CTE code path, fix a bug that top down style unresolved window expression check leads to mis-clarification of a defined window expression. 2. Move unresolved window expression check in project to `CheckAnalysis`. ### Why are the changes needed? This bug fails a correct query. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT Closes #36947 from amaliujia/improvewindow. Authored-by: Rui Wang <rui.wang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 9513393 commit 4718d59

File tree

3 files changed

+33
-10
lines changed

3 files changed

+33
-10
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ class Analyzer(override val catalogManager: CatalogManager)
451451
* Substitute child plan with WindowSpecDefinitions.
452452
*/
453453
object WindowsSubstitution extends Rule[LogicalPlan] {
454-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDownWithPruning(
454+
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
455455
_.containsAnyPattern(WITH_WINDOW_DEFINITION, UNRESOLVED_WINDOW_EXPRESSION), ruleId) {
456456
// Lookup WindowSpecDefinitions. This rule works with unresolved children.
457457
case WithWindowDefinition(windowDefinitions, child) => child.resolveExpressions {
@@ -460,14 +460,6 @@ class Analyzer(override val catalogManager: CatalogManager)
460460
throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowName))
461461
WindowExpression(c, windowSpecDefinition)
462462
}
463-
464-
case p @ Project(projectList, _) =>
465-
projectList.foreach(_.transformDownWithPruning(
466-
_.containsPattern(UNRESOLVED_WINDOW_EXPRESSION), ruleId) {
467-
case UnresolvedWindowExpression(_, windowSpec) =>
468-
throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowSpec.name)
469-
})
470-
p
471463
}
472464
}
473465

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, Decorrela
2626
import org.apache.spark.sql.catalyst.plans._
2727
import org.apache.spark.sql.catalyst.plans.logical._
2828
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
29+
import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_WINDOW_EXPRESSION
2930
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, StringUtils, TypeUtils}
3031
import org.apache.spark.sql.connector.catalog.{LookupCatalog, SupportsPartitionManagement}
3132
import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -231,7 +232,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
231232
failAnalysis("grouping_id() can only be used with GroupingSets/Cube/Rollup")
232233

233234
case e: Expression if e.children.exists(_.isInstanceOf[WindowFunction]) &&
234-
!e.isInstanceOf[WindowExpression] =>
235+
!e.isInstanceOf[WindowExpression] && e.resolved =>
235236
val w = e.children.find(_.isInstanceOf[WindowFunction]).get
236237
failAnalysis(s"Window function $w requires an OVER clause.")
237238

@@ -542,6 +543,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
542543
s"""Only a single table generating function is allowed in a SELECT clause, found:
543544
| ${exprs.map(_.sql).mkString(",")}""".stripMargin)
544545

546+
case p @ Project(projectList, _) =>
547+
projectList.foreach(_.transformDownWithPruning(
548+
_.containsPattern(UNRESOLVED_WINDOW_EXPRESSION)) {
549+
case UnresolvedWindowExpression(_, windowSpec) =>
550+
throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowSpec.name)
551+
})
552+
545553
case j: Join if !j.duplicateResolved =>
546554
val conflictingAttributes = j.left.outputSet.intersect(j.right.outputSet)
547555
failAnalysis(

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4449,6 +4449,29 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
44494449
""".stripMargin),
44504450
Seq(Row(2), Row(1)))
44514451
}
4452+
4453+
test("SPARK-39548: CreateView will make queries go into inline CTE code path thus" +
4454+
"trigger a mis-clarified `window definition not found` issue") {
4455+
sql(
4456+
"""
4457+
|create or replace temporary view test_temp_view as
4458+
|with step_1 as (
4459+
|select * , min(a) over w2 as min_a_over_w2 from
4460+
|(select 1 as a, 2 as b, 3 as c) window w2 as (partition by b order by c)) , step_2 as
4461+
|(
4462+
|select *, max(e) over w1 as max_a_over_w1
4463+
|from (select 1 as e, 2 as f, 3 as g)
4464+
|join step_1 on true
4465+
|window w1 as (partition by f order by g)
4466+
|)
4467+
|select *
4468+
|from step_2
4469+
|""".stripMargin)
4470+
4471+
checkAnswer(
4472+
sql("select * from test_temp_view"),
4473+
Row(1, 2, 3, 1, 2, 3, 1, 1))
4474+
}
44524475
}
44534476

44544477
case class Foo(bar: Option[String])

0 commit comments

Comments
 (0)