Skip to content

Commit bccefa5

Browse files
rednaxelafxcloud-fan
authored andcommitted
[SPARK-26352][SQL][FOLLOWUP-2.3] Fix missing sameOutput in branch-2.3
## What changes were proposed in this pull request? This is the branch-2.3 equivalent of #23330. After #23303 was merged to branch-2.3/2.4, the builds on those branches were broken due to missing a `LogicalPlan.sameOutput` function which came from #22713 only available on master. This PR is to follow-up with the broken 2.3/2.4 branches and make a copy of the new `LogicalPlan.sameOutput` into `ReorderJoin` to make it locally available. ## How was this patch tested? Fix the build of 2.3/2.4. Closes #23333 from rednaxelafx/branch-2.3. Authored-by: Kris Mok <rednaxelafx@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 1576bd7 commit bccefa5

File tree

2 files changed

+33
-3
lines changed

2 files changed

+33
-3
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,14 +99,29 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
9999
createOrderedJoin(input, conditions)
100100
}
101101

102-
if (p.sameOutput(reordered)) {
102+
if (sameOutput(p, reordered)) {
103103
reordered
104104
} else {
105105
// Reordering the joins have changed the order of the columns.
106106
// Inject a projection to make sure we restore to the expected ordering.
107107
Project(p.output, reordered)
108108
}
109109
}
110+
111+
/**
112+
* Returns true iff output of both plans are semantically the same, ie.:
113+
* - they contain the same number of `Attribute`s;
114+
* - references are the same;
115+
* - the order is equal too.
116+
* NOTE: this is copied over from SPARK-25691 from master.
117+
*/
118+
def sameOutput(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = {
119+
val output1 = plan1.output
120+
val output2 = plan2.output
121+
output1.length == output2.length && output1.zip(output2).forall {
122+
case (a1, a2) => a1.semanticEquals(a2)
123+
}
124+
}
110125
}
111126

112127
/**

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,13 +300,28 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase {
300300
val optimized = Optimize.execute(analyzed)
301301
val expected = groundTruthBestPlan.analyze
302302

303-
assert(analyzed.sameOutput(expected)) // if this fails, the expected plan itself is incorrect
304-
assert(analyzed.sameOutput(optimized))
303+
assert(sameOutput(analyzed, expected)) // if this fails, the expected plan itself is incorrect
304+
assert(sameOutput(analyzed, optimized))
305305

306306
compareJoinOrder(optimized, expected)
307307
}
308308

309309
private def outputsOf(plans: LogicalPlan*): Seq[Attribute] = {
310310
plans.map(_.output).reduce(_ ++ _)
311311
}
312+
313+
/**
314+
* Returns true iff output of both plans are semantically the same, ie.:
315+
* - they contain the same number of `Attribute`s;
316+
* - references are the same;
317+
* - the order is equal too.
318+
* NOTE: this is copied over from SPARK-25691 from master.
319+
*/
320+
def sameOutput(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = {
321+
val output1 = plan1.output
322+
val output2 = plan2.output
323+
output1.length == output2.length && output1.zip(output2).forall {
324+
case (a1, a2) => a1.semanticEquals(a2)
325+
}
326+
}
312327
}

0 commit comments

Comments
 (0)