-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-42660][SQL] Infer filters for Join produced by IN and EXISTS clause (RewritePredicateSubquery rule) #40266
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @cloud-fan |
cc: @wangyum @peter-toth |
This change makes sense to me and new plans look ok to me. |
I had a change like this before: #22778. |
Ah ok, thanks @wangyum! It looks like the very same discussuion has come up before: #22778 (comment) |
@wangyum @peter-toth Thanks for pointing on previous attempts. It does seem moving In this pr #17520, they tried to put RewritePredicateSubquery right after If this seems right to you guys, I can update this PR to move |
Looks like there are a few failures after moving the rule (22e7886). @mskapilks, do you think you can look into those failures? |
Yup I am working on them. I had wrong SPARK_HOME setup so missed the plan changes |
@@ -1158,12 +1158,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan | |||
var joinExec = assertJoin(( | |||
"select * from testData where key not in (select a from testData2)", | |||
classOf[BroadcastHashJoinExec])) | |||
assert(joinExec.asInstanceOf[BroadcastHashJoinExec].isNullAwareAntiJoin) | |||
assert(!joinExec.asInstanceOf[BroadcastHashJoinExec].isNullAwareAntiJoin) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two queries don't need NWAJ now due to more inferred filters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please ellaborate on this a bit more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plan for this query before this change:
Join LeftAnti, ((key#13 = a#23) OR isnull((key#13 = a#23)))
:- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14]
: +- ExternalRDD [obj#12]
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23]
+- ExternalRDD [obj#22]
New plan
Join LeftAnti, (key#13 = a#23)
:- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14]
: +- ExternalRDD [obj#12]
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23]
+- ExternalRDD [obj#22]
isnull((key#13 = a#23))
condition got removed by NullPropagation
rule (as now all optimization rules will run after subquery rewrite).
So now the join does get convert to Null Aware Anti Join as that's only happens when condition like previous plan exists. LeftAnti(condition: Or(EqualTo(a=b), IsNull(EqualTo(a=b)))
Code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, then I think we need to fix the test query (and not the expected result) as not in
can't be rewritten to a simple (not null-aware) BroadcastHashJoinExec
if we don't know the key
's and a
's nullability. I think the problem here is that we use TestData
and TestData2
where key
and a
are Int
s and not Integer
s.
+- * ColumnarToRow (42) | ||
+- Scan parquet spark_catalog.default.customer_demographics (41) | ||
+- * Filter (46) | ||
+- * SortMergeJoin ExistenceJoin(exists#1) (45) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems pushdown is not happening? Need to check this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems PushLeftSemiLeftAntiThroughJoin
PushDownLeftSemiAntiJoin
doesn't consider ExistenceJoin. Might need to update these rules or do predicate pushdown before subquery rewrite (this may not be ideal)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filter (46)
's condition is exists#2 OR exists#1
, that can't be pushed down. But that's ok as it is basically the same as the old Filter (30)
was.
In the new plan the order of joins are a bit different, but I'm not sure the new plan would be worse. Actually we have 3 SMJ + 5 BHJ now whereas we had 4 + 4...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please run a TPCDS benchmark to make sure we don't introduce performance regression?
More failures. Seems this might take real effort to make it work like other rules modifications. |
Why was the latest commit (b1ed7be) needed? |
@mskapilks, do you have any update on this? I can take over this PR and investigate the idea further if you don't have time for it. |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
We should run
InferFiltersFromConstraints
again after runningRewritePredicateSubquery
rule.RewritePredicateSubquery
rewrite IN and EXISTS queries to LEFT SEMI/LEFT ANTI joins. But we don't infer filters for these newly generated joins. We noticed in TPCH 1TB q21 by inferring filter for these new joins, onelineitem
table scan can be reduced asReusedExchange
got introduce. Previously due to mismatch in filter predicates reuse was not happening.Why are the changes needed?
Can improve query performance.
Does this PR introduce any user-facing change?
No
How was this patch tested?
PlanStability test