Skip to content

Commit c6be207

Browse files
Ngone51cloud-fan
authored andcommitted
[SPARK-32616][SQL] Window operators should be added determinedly
### What changes were proposed in this pull request? Use the `LinkedHashMap` instead of `immutable.Map` to hold the `Window` expressions in `ExtractWindowExpressions.addWindow`. ### Why are the changes needed? This is a bug fix for apache#29270. In that PR, the generated plan(especially for the queries q47, q49, q57) on Jenkins always can not match the golden plan generated on my laptop. It happens because `ExtractWindowExpressions.addWindow` now uses `immutable.Map` to hold the `Window` expressions by the key `(spec.partitionSpec, spec.orderSpec, WindowFunctionType.functionType(expr))` and converts the map to `Seq` at the end. Then, the `Seq` is used to add Window operators on top of the child plan. However, for the same query, the order of Windows expression inside the `Seq` could be undetermined when the expression id changes(which can affect the key). As a result, the same query could have different plans because of the undetermined order of Window operators. Therefore, we use `LinkedHashMap`, which records the insertion order of entries, to make the adding order determined. ### Does this PR introduce _any_ user-facing change? Maybe yes, users now always see the same plan for the same queries with multiple Window operators. ### How was this patch tested? It's really hard to make a reproduce demo. I just tested manually with apache#29270 and it looks good. Closes apache#29432 from Ngone51/fix-addWindow. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 10edeaf commit c6be207

File tree

1 file changed

+11
-3
lines changed
  • sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis

1 file changed

+11
-3
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2552,6 +2552,8 @@ class Analyzer(
25522552
* [[Window]] operator and inserts it into the plan tree.
25532553
*/
25542554
object ExtractWindowExpressions extends Rule[LogicalPlan] {
2555+
type Spec = (Seq[Expression], Seq[SortOrder], WindowFunctionType)
2556+
25552557
private def hasWindowFunction(exprs: Seq[Expression]): Boolean =
25562558
exprs.exists(hasWindowFunction)
25572559

@@ -2696,8 +2698,11 @@ class Analyzer(
26962698
}.asInstanceOf[NamedExpression]
26972699
}
26982700

2701+
// SPARK-32616: Use a linked hash map to maintains the insertion order of the Window
2702+
// operators, so the query with multiple Window operators can have the determined plan.
2703+
val groupedWindowExpressions = mutable.LinkedHashMap.empty[Spec, ArrayBuffer[NamedExpression]]
26992704
// Second, we group extractedWindowExprBuffer based on their Partition and Order Specs.
2700-
val groupedWindowExpressions = extractedWindowExprBuffer.groupBy { expr =>
2705+
extractedWindowExprBuffer.foreach { expr =>
27012706
val distinctWindowSpec = expr.collect {
27022707
case window: WindowExpression => window.windowSpec
27032708
}.distinct
@@ -2713,9 +2718,12 @@ class Analyzer(
27132718
s"Please file a bug report with this error message, stack trace, and the query.")
27142719
} else {
27152720
val spec = distinctWindowSpec.head
2716-
(spec.partitionSpec, spec.orderSpec, WindowFunctionType.functionType(expr))
2721+
val specKey = (spec.partitionSpec, spec.orderSpec, WindowFunctionType.functionType(expr))
2722+
val windowExprs = groupedWindowExpressions
2723+
.getOrElseUpdate(specKey, new ArrayBuffer[NamedExpression])
2724+
windowExprs += expr
27172725
}
2718-
}.toSeq
2726+
}
27192727

27202728
// Third, we aggregate them by adding each Window operator for each Window Spec and then
27212729
// setting this to the child of the next Window operator.

0 commit comments

Comments
 (0)