Skip to content

Commit 87fc62a

Browse files
committed
[SPARK-25691][SQL] Use semantic equality in order to compare attributes
1 parent 2eaf058 commit 87fc62a

File tree

4 files changed

+23
-12
lines changed

4 files changed

+23
-12
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1179,7 +1179,7 @@ class Analyzer(
11791179
if (!s.resolved || s.missingInput.nonEmpty) && child.resolved =>
11801180
val (newOrder, newChild) = resolveExprsAndAddMissingAttrs(order, child)
11811181
val ordering = newOrder.map(_.asInstanceOf[SortOrder])
1182-
if (child.output == newChild.output) {
1182+
if (child.sameOutput(newChild)) {
11831183
s.copy(order = ordering)
11841184
} else {
11851185
// Add missing attributes and then project them away.
@@ -1189,7 +1189,7 @@ class Analyzer(
11891189

11901190
case f @ Filter(cond, child) if (!f.resolved || f.missingInput.nonEmpty) && child.resolved =>
11911191
val (newCond, newChild) = resolveExprsAndAddMissingAttrs(Seq(cond), child)
1192-
if (child.output == newChild.output) {
1192+
if (child.sameOutput(newChild)) {
11931193
f.copy(condition = newCond.head)
11941194
} else {
11951195
// Add missing attributes and then project them away.
@@ -2087,7 +2087,7 @@ class Analyzer(
20872087
// todo: It's hard to write a general rule to pull out nondeterministic expressions
20882088
// from LogicalPlan, currently we only do it for UnaryNode which has same output
20892089
// schema with its child.
2090-
case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) =>
2090+
case p: UnaryNode if p.sameOutput(p.child) && p.expressions.exists(!_.deterministic) =>
20912091
val nondeterToAttr = getNondeterToAttr(p.expressions)
20922092
val newPlan = p.transformExpressions { case e =>
20932093
nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ import org.apache.spark.sql.internal.SQLConf
4949
*/
5050
case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport {
5151
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
52-
case v @ View(desc, output, child) if child.resolved && output != child.output =>
52+
case v @ View(desc, output, child) if child.resolved && !v.sameOutput(child) =>
5353
val resolver = conf.resolver
5454
val queryColumnNames = desc.viewQueryColumnNames
5555
val queryOutput = if (queryColumnNames.nonEmpty) {
@@ -70,7 +70,7 @@ case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with CastSupp
7070
}
7171
// Map the attributes in the query output to the attributes in the view output by index.
7272
val newOutput = output.zip(queryOutput).map {
73-
case (attr, originAttr) if attr != originAttr =>
73+
case (attr, originAttr) if !attr.semanticEquals(originAttr) =>
7474
// The dataType of the output attributes may be not the same with that of the view
7575
// output, so we should cast the attribute to the dataType of the view output attribute.
7676
// Will throw an AnalysisException if the cast can't perform or might truncate.
@@ -112,8 +112,8 @@ object EliminateView extends Rule[LogicalPlan] {
112112
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
113113
// The child should have the same output attributes with the View operator, so we simply
114114
// remove the View operator.
115-
case View(_, output, child) =>
116-
assert(output == child.output,
115+
case v @ View(_, output, child) =>
116+
assert(v.sameOutput(child),
117117
s"The output of the child ${child.output.mkString("[", ",", "]")} is different from the " +
118118
s"view output ${output.mkString("[", ",", "]")}")
119119
child

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
405405
*/
406406
object RemoveRedundantProject extends Rule[LogicalPlan] {
407407
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
408-
case p @ Project(_, child) if p.output == child.output => child
408+
case p @ Project(_, child) if p.sameOutput(child) => child
409409
}
410410
}
411411

@@ -530,9 +530,6 @@ object PushProjectionThroughUnion extends Rule[LogicalPlan] with PredicateHelper
530530
* p2 is usually inserted by this rule and useless, p1 could prune the columns anyway.
531531
*/
532532
object ColumnPruning extends Rule[LogicalPlan] {
533-
private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean =
534-
output1.size == output2.size &&
535-
output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))
536533

537534
def apply(plan: LogicalPlan): LogicalPlan = removeProjectBeforeFilter(plan transform {
538535
// Prunes the unused columns from project list of Project/Aggregate/Expand
@@ -607,7 +604,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
607604
case w: Window if w.windowExpressions.isEmpty => w.child
608605

609606
// Eliminate no-op Projects
610-
case p @ Project(_, child) if sameOutput(child.output, p.output) => child
607+
case p @ Project(_, child) if child.sameOutput(p) => child
611608

612609
// Can't prune the columns on LeafNode
613610
case p @ Project(_, _: LeafNode) => p

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,20 @@ abstract class LogicalPlan
130130
* Returns the output ordering that this plan generates.
131131
*/
132132
def outputOrdering: Seq[SortOrder] = Nil
133+
134+
/**
135+
* Returns true iff `other`'s output is semantically the same, ie.:
136+
* - it contains the same number of `Attribute`s;
137+
* - references are the same;
138+
* - the order is equal too.
139+
*/
140+
def sameOutput(other: LogicalPlan): Boolean = {
141+
val thisOutput = this.output
142+
val otherOutput = other.output
143+
thisOutput.length == otherOutput.length && thisOutput.zip(otherOutput).forall {
144+
case (a1, a2) => a1.semanticEquals(a2)
145+
}
146+
}
133147
}
134148

135149
/**

0 commit comments

Comments
 (0)