Skip to content

Commit c005a37

Browse files
vladimirg-dbcloud-fan
authored andcommitted
[SPARK-51385][SQL] Normalize out projection added in DeduplicateRelations for union child output deduplication
### What changes were proposed in this pull request? Strip away extra projection added by `DeduplicateRelations` when comparing logical plans. `DeduplicateRelations` puts one extra `Project` on the right branch of `Union` when the outputs of children are conflicting. This is a hack for streaming relations. Unfortunately this logic is generalized an the extra projection is used for simple cases like views: ``` CREATE VIEW IF NOT EXISTS v1 AS SELECT * FROM VALUES (1, 2); SELECT * FROM ( SELECT col1, col2 FROM v1 UNION ALL SELECT col1, col2 FROM v1 ); ``` Single-pass Analyzer should not produce this projection, because it assigns expression IDs in single-pass, so we strip it in `NormalizePlan` to correctly compare the plans. ### Why are the changes needed? This is to make sure that single-pass and fixed-point Analyzed plans are the same. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#50148 from vladimirg-db/vladimir-golubev_data/normalize-artificial-project-in-union. Authored-by: Vladimir Golubev <vladimir.golubev@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent a6600af commit c005a37

File tree

3 files changed

+18
-74
lines changed

3 files changed

+18
-74
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@ import scala.collection.mutable
2222
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Expression, NamedExpression, OuterReference, SubqueryExpression}
2323
import org.apache.spark.sql.catalyst.plans.logical._
2424
import org.apache.spark.sql.catalyst.rules.Rule
25+
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
2526
import org.apache.spark.sql.catalyst.trees.TreePattern._
2627

2728
object DeduplicateRelations extends Rule[LogicalPlan] {
29+
val PROJECT_FOR_EXPRESSION_ID_DEDUPLICATION =
30+
TreeNodeTag[Unit]("project_for_expression_id_deduplication")
2831

2932
type ExprIdMap = mutable.HashMap[Class[_], mutable.HashSet[Long]]
3033

@@ -67,7 +70,9 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
6770
val projectList = child.output.map { attr =>
6871
Alias(attr, attr.name)()
6972
}
70-
Project(projectList, child)
73+
val project = Project(projectList, child)
74+
project.setTagValue(DeduplicateRelations.PROJECT_FOR_EXPRESSION_ID_DEDUPLICATION, ())
75+
project
7176
}
7277
}
7378
u.copy(children = newChildren)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/UnionResolver.scala

Lines changed: 6 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.{
2626
TypeCoercionBase
2727
}
2828
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, ExprId}
29-
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Project, Union}
29+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union}
3030
import org.apache.spark.sql.errors.QueryCompilationErrors
3131
import org.apache.spark.sql.types.{DataType, MetadataBuilder}
3232

@@ -52,8 +52,6 @@ class UnionResolver(resolver: Resolver, expressionResolver: ExpressionResolver)
5252
* for partially resolved subtrees from DataFrame programs.
5353
* - Resolve each child in the context of a) New [[NameScope]] b) New [[ExpressionIdAssigner]]
5454
* mapping. Collect child outputs to coerce them later.
55-
* - Perform projection-based expression ID deduplication if required. This is a hack to stay
56-
* compatible with fixed-point [[Analyzer]].
5755
* - Perform individual output deduplication to handle the distinct union case described in
5856
* [[performIndividualOutputExpressionIdDeduplication]] scaladoc.
5957
* - Validate that child outputs have same length or throw "NUM_COLUMNS_MISMATCH" otherwise.
@@ -68,10 +66,10 @@ class UnionResolver(resolver: Resolver, expressionResolver: ExpressionResolver)
6866
* - Return the resolved [[Union]] with new children.
6967
*/
7068
override def resolve(unresolvedUnion: Union): Union = {
71-
val (oldOutput, oldChildOutputs) = if (unresolvedUnion.resolved) {
72-
(Some(unresolvedUnion.output), Some(unresolvedUnion.children.map(_.output)))
69+
val oldOutput = if (unresolvedUnion.resolved) {
70+
Some(unresolvedUnion.output)
7371
} else {
74-
(None, None)
72+
None
7573
}
7674

7775
val (resolvedChildren, childOutputs) = unresolvedUnion.children.zipWithIndex.map {
@@ -84,16 +82,10 @@ class UnionResolver(resolver: Resolver, expressionResolver: ExpressionResolver)
8482
}
8583
}.unzip
8684

87-
val (projectBasedDeduplicatedChildren, projectBasedDeduplicatedChildOutputs) =
88-
performProjectionBasedExpressionIdDeduplication(
89-
resolvedChildren,
90-
childOutputs,
91-
oldChildOutputs
92-
)
9385
val (deduplicatedChildren, deduplicatedChildOutputs) =
9486
performIndividualOutputExpressionIdDeduplication(
95-
projectBasedDeduplicatedChildren,
96-
projectBasedDeduplicatedChildOutputs
87+
resolvedChildren,
88+
childOutputs
9789
)
9890

9991
val (newChildren, newChildOutputs) = if (needToCoerceChildOutputs(deduplicatedChildOutputs)) {
@@ -117,64 +109,6 @@ class UnionResolver(resolver: Resolver, expressionResolver: ExpressionResolver)
117109
unresolvedUnion.copy(children = newChildren)
118110
}
119111

120-
/**
121-
* Fixed-point [[Analyzer]] uses [[DeduplicateRelations]] rule to handle duplicate expression IDs
122-
* in multi-child operator outputs. For [[Union]]s it uses a "projection-based deduplication",
123-
* i.e. places another [[Project]] operator with new [[Alias]]es on the right child if duplicate
124-
* expression IDs detected. New [[Alias]] "covers" the original attribute with new expression ID.
125-
* This is done for all child operators except [[LeafNode]]s.
126-
*
127-
* We don't need this operation in single-pass [[Resolver]], since we have
128-
* [[ExpressionIdAssigner]] for expression ID deduplication, but perform it nevertheless to stay
129-
* compatible with fixed-point [[Analyzer]]. Since new outputs are already deduplicated by
130-
* [[ExpressionIdAssigner]], we check the _old_ outputs for duplicates and place a [[Project]]
131-
* only if old outputs are available (i.e. we are dealing with a resolved subtree from
132-
* DataFrame program).
133-
*/
134-
private def performProjectionBasedExpressionIdDeduplication(
135-
children: Seq[LogicalPlan],
136-
childOutputs: Seq[Seq[Attribute]],
137-
oldChildOutputs: Option[Seq[Seq[Attribute]]]
138-
): (Seq[LogicalPlan], Seq[Seq[Attribute]]) = {
139-
oldChildOutputs match {
140-
case Some(oldChildOutputs) =>
141-
val oldExpressionIds = new HashSet[ExprId]
142-
143-
children
144-
.zip(childOutputs)
145-
.zip(oldChildOutputs)
146-
.map {
147-
case ((child: LeafNode, output), _) =>
148-
(child, output)
149-
150-
case ((child, output), oldOutput) =>
151-
val oldOutputExpressionIds = new HashSet[ExprId]
152-
153-
val hasConflicting = oldOutput.exists { oldAttribute =>
154-
oldOutputExpressionIds.add(oldAttribute.exprId)
155-
oldExpressionIds.contains(oldAttribute.exprId)
156-
}
157-
158-
if (hasConflicting) {
159-
val newExpressions = output.map { attribute =>
160-
Alias(attribute, attribute.name)()
161-
}
162-
(
163-
Project(projectList = newExpressions, child = child),
164-
newExpressions.map(_.toAttribute)
165-
)
166-
} else {
167-
oldExpressionIds.addAll(oldOutputExpressionIds)
168-
169-
(child, output)
170-
}
171-
}
172-
.unzip
173-
case _ =>
174-
(children, childOutputs)
175-
}
176-
}
177-
178112
/**
179113
* Deduplicate expression IDs at the scope of each individual child output. This is necessary to
180114
* handle the following case:

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.plans
1919

2020
import java.util.HashMap
2121

22-
import org.apache.spark.sql.catalyst.analysis.GetViewColumnByNameAndOrdinal
22+
import org.apache.spark.sql.catalyst.analysis.{DeduplicateRelations, GetViewColumnByNameAndOrdinal}
2323
import org.apache.spark.sql.catalyst.expressions._
2424
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
2525
import org.apache.spark.sql.catalyst.optimizer.ReplaceExpressions
@@ -145,6 +145,11 @@ object NormalizePlan extends PredicateHelper {
145145
.sortBy(_.hashCode())
146146
.reduce(And)
147147
Join(left, right, newJoinType, Some(newCondition), hint)
148+
case project: Project
149+
if project
150+
.getTagValue(DeduplicateRelations.PROJECT_FOR_EXPRESSION_ID_DEDUPLICATION)
151+
.isDefined =>
152+
project.child
148153
case Project(projectList, child) =>
149154
val projList = projectList
150155
.map { e =>

0 commit comments

Comments
 (0)