[SPARK-36290][SQL] Pull out join condition#33522
[SPARK-36290][SQL] Pull out join condition#33522wangyum wants to merge 20 commits intoapache:masterfrom wangyum:SPARK-36290
Conversation
# Conflicts: # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q72.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q72/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q72.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q72/explain.txt
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #141646 has finished for PR 33522 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #141777 has finished for PR 33522 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #141921 has finished for PR 33522 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #141923 has finished for PR 33522 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #141928 has finished for PR 33522 at commit
|
|
Test build #141930 has finished for PR 33522 at commit
|
|
Kubernetes integration test starting |
|
|
Kubernetes integration test status failure |
|
Test build #144900 has finished for PR 33522 at commit
|
# Conflicts: # sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
|
Kubernetes integration test starting |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #144902 has finished for PR 33522 at commit
|
|
Kubernetes integration test status failure |
|
Test build #144904 has finished for PR 33522 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #144918 has finished for PR 33522 at commit
|
| private val y = testRelation1.subquery('y) | ||
|
|
||
| test("Pull out join keys evaluation(String expressions)") { | ||
| val joinType = Inner |
There was a problem hiding this comment.
can we just inline it? the join type doesn't matter anyway.
There was a problem hiding this comment.
ditto for other places in this test suite
| val joinType = Inner | ||
| Seq(Upper("y.d".attr), Substring("y.d".attr, 1, 5)).foreach { udf => | ||
| val originalQuery = x.join(y, joinType, Option("x.a".attr === udf)) | ||
| .select("x.a".attr, "y.e".attr) |
There was a problem hiding this comment.
we don't need to use qualified column names. There is no name conflict.
There was a problem hiding this comment.
ditto for other places in this test suite
There was a problem hiding this comment.
Removed it. But UDF need it otherwise:
== FAIL: Plans do not match ===
'Project [a#0, e#0] 'Project [a#0, e#0]
!+- 'Join Inner, (a#0 = upper(y.d)#0) +- 'Join Inner, (upper(d)#0 = a#0)
:- Project [a#0, b#0, c#0] :- Project [a#0, b#0, c#0]
: +- LocalRelation <empty>, [a#0, b#0, c#0] : +- LocalRelation <empty>, [a#0, b#0, c#0]
! +- Project [d#0, e#0, upper(d#0) AS upper(y.d)#0] +- Project [d#0, e#0, upper(d#0) AS upper(d)#0]
+- LocalRelation <empty>, [d#0, e#0] +- LocalRelation <empty>, [d#0, e#0]
|
|
||
| test("Pull out join keys evaluation(null expressions)") { | ||
| val joinType = Inner | ||
| val udf = Coalesce(Seq("x.b".attr, "x.c".attr)) |
There was a problem hiding this comment.
Can't we merge this test case with the one above?
There was a problem hiding this comment.
Seq(Upper("d".attr), Substring("d".attr, 1, 5), Coalesce(Seq("b".attr, "c".attr)))...
There was a problem hiding this comment.
Move this test to SQLQuerySuite to test infer more IsNotNull:
spark/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Lines 4231 to 4253 in 4aebe0a
| } | ||
|
|
||
| test("Pull out EqualNullSafe join condition") { | ||
| withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { |
| case p: BatchEvalPythonExec => p | ||
| } | ||
| assert(pythonEvals.size == 2) | ||
| assert(pythonEvals.size == 4) |
There was a problem hiding this comment.
This looks like a big problem. Can you investigate why we run python UDF 2 more times?
There was a problem hiding this comment.
It is because we will infer two isnotnull(cast(pythonUDF0 as int)):
== Optimized Logical Plan ==
Project [a#225, b#226, c#236, d#237]
+- Join Inner, (CAST(udf(cast(a as string)) AS INT)#250 = CAST(udf(cast(c as string)) AS INT)#251)
:- Project [_1#220 AS a#225, _2#221 AS b#226, cast(pythonUDF0#253 as int) AS CAST(udf(cast(a as string)) AS INT)#250]
: +- BatchEvalPython [udf(cast(_1#220 as string))], [pythonUDF0#253]
: +- Project [_1#220, _2#221]
: +- Filter isnotnull(cast(pythonUDF0#252 as int))
: +- BatchEvalPython [udf(cast(_1#220 as string))], [pythonUDF0#252]
: +- LocalRelation [_1#220, _2#221]
+- Project [_1#231 AS c#236, _2#232 AS d#237, cast(pythonUDF0#255 as int) AS CAST(udf(cast(c as string)) AS INT)#251]
+- BatchEvalPython [udf(cast(_1#231 as string))], [pythonUDF0#255]
+- Project [_1#231, _2#232]
+- Filter isnotnull(cast(pythonUDF0#254 as int))
+- BatchEvalPython [udf(cast(_1#231 as string))], [pythonUDF0#254]
+- LocalRelation [_1#231, _2#232]
Before this pr:
== Optimized Logical Plan ==
Project [a#225, b#226, c#236, d#237]
+- Join Inner, (cast(pythonUDF0#250 as int) = cast(pythonUDF0#251 as int))
:- BatchEvalPython [udf(cast(a#225 as string))], [pythonUDF0#250]
: +- Project [_1#220 AS a#225, _2#221 AS b#226]
: +- LocalRelation [_1#220, _2#221]
+- BatchEvalPython [udf(cast(c#236 as string))], [pythonUDF0#251]
+- Project [_1#231 AS c#236, _2#232 AS d#237]
+- LocalRelation [_1#231, _2#232]
There was a problem hiding this comment.
OK, so it's because filter push down can lead to extra expression evaluation?
There was a problem hiding this comment.
Yes. If we disable spark.sql.constraintPropagation.enabled, the plan is:
== Optimized Logical Plan ==
Project [a#225, b#226, c#236, d#237]
+- Join Inner, (CAST(udf(cast(a as string)) AS INT)#250 = CAST(udf(cast(c as string)) AS INT)#251)
:- Project [_1#220 AS a#225, _2#221 AS b#226, cast(pythonUDF0#252 as int) AS CAST(udf(cast(a as string)) AS INT)#250]
: +- BatchEvalPython [udf(cast(_1#220 as string))], [pythonUDF0#252]
: +- LocalRelation [_1#220, _2#221]
+- Project [_1#231 AS c#236, _2#232 AS d#237, cast(pythonUDF0#253 as int) AS CAST(udf(cast(c as string)) AS INT)#251]
+- BatchEvalPython [udf(cast(_1#231 as string))], [pythonUDF0#253]
+- LocalRelation [_1#231, _2#232]
There was a problem hiding this comment.
@wangyum @cloud-fan Seems we can just put the rule after InferFiltersFromConstraints, and actually after the earlyScanPushDownRules. The python UDF test passed in my pr, see #36874.
There was a problem hiding this comment.
I think pull out has 3 advantages:
- Reduce complex join key runs from 3 to 2 for SMJ.
- Infer additional filters, sometimes can avoid data skew. For example: [SPARK-31809][SQL] Infer IsNotNull from some special equality join keys #28642
- Avoid other rules also handle this case. For example: https://github.com/apache/spark/blob/dee7396204e2f6e7346e220867953fc74cd4253d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushPartialAggregationThroughJoin.scala#L325-L327
It has two disadvantage:
- Increase complex join key runs from 1 to 2 for BHJ.
- It may increase the data size of shuffle. For example: the join key is:
concat(col1, col2, col3, col4 ...).
Personally, I think this rule is valuable. We have been using this rule for half a year.
There was a problem hiding this comment.
Increase complex join key runs from 1 to 2 for BHJ.
We can check if the poll out side can be broadcast so it should not be a blocker ?
It may increase the data size of shuffle. For example: the join key is: concat(col1, col2, col3, col4 ...).
This is really a trade-off, one conservative option may be: We only poll out the complex keys which the inside attribute is not the final output. So we can avoid the extra shuffle data as far as possible, for example:
SELECT a FROM t1 JOIN t2 on t1.a = t2.x + 1;And a config should be introduced for enable or disable easily.
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #144936 has finished for PR 33522 at commit
|
| p.isInstanceOf[WholeStageCodegenExec] && | ||
| p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[BroadcastHashJoinExec]).isDefined) | ||
| val broadcastHashJoin = df.queryExecution.executedPlan.find { | ||
| case WholeStageCodegenExec(ProjectExec(_, _: BroadcastHashJoinExec)) => true |
There was a problem hiding this comment.
wondering why we now need an extra project after join here? Can we remove it? The join seems not have complex key here.
There was a problem hiding this comment.
There is a cast.
== Optimized Logical Plan ==
Project [id#6L, k#2, v#3]
+- Join Inner, (id#6L = CAST(k AS BIGINT)#14L), rightHint=(strategy=broadcast)
:- Range (0, 10, step=1, splits=Some(2))
+- Project [k#2, v#3, cast(k#2 as bigint) AS CAST(k AS BIGINT)#14L]
+- Filter isnotnull(cast(k#2 as bigint))
+- LogicalRDD [k#2, v#3], false
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
Similar to
PullOutGroupingExpressions. This pr add a new rule(PullOutJoinCondition) to pull out join condition. Otherwise the expression in join condition may be evaluated three times(ShuffleExchangeExec,SortExecand the join itself). For example:Before this pr:
After this pr:
Why are the changes needed?
Improve query performance.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test and benchmark test:
Benchmark result: