Skip to content

[SPARK-42660][SQL] Infer filters for Join produced by IN and EXISTS clause (RewritePredicateSubquery rule) #40266

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from

Conversation

mskapilks
Copy link
Contributor

What changes were proposed in this pull request?

We should run InferFiltersFromConstraints again after running RewritePredicateSubquery rule. RewritePredicateSubquery rewrite IN and EXISTS queries to LEFT SEMI/LEFT ANTI joins. But we don't infer filters for these newly generated joins. We noticed in TPCH 1TB q21 by inferring filter for these new joins, one lineitem table scan can be reduced as ReusedExchange got introduce. Previously due to mismatch in filter predicates reuse was not happening.

Why are the changes needed?

Can improve query performance.

Does this PR introduce any user-facing change?

No

How was this patch tested?

PlanStability test

@github-actions github-actions bot added the SQL label Mar 3, 2023
@mskapilks
Copy link
Contributor Author

TPCH q21 plan change

Before After
Web capture_3-3-2023_125513_ms web azuresynapse net_crop Web capture_3-3-2023_12324_ms web azuresynapse net_crop

@mskapilks mskapilks changed the title [SPARK-42660] Infer filters for Join produced by IN and EXISTS clause (RewritePredicateSubquery rule) [SPARK-42660][SQL] Infer filters for Join produced by IN and EXISTS clause (RewritePredicateSubquery rule) Mar 3, 2023
@mskapilks
Copy link
Contributor Author

cc @cloud-fan

@mskapilks
Copy link
Contributor Author

cc: @wangyum @peter-toth

@peter-toth
Copy link
Contributor

This change makes sense to me and new plans look ok to me.
However, seemingly InferFiltersFromConstraints has a dedicated place in the optimizer and so there are 2 special batches Operator Optimization before Inferring Filters and Operator Optimization after Inferring Filters before and after the rule to make sure the inferred filtes are optimized. It also seems like the RewriteSubquery batch slowly becomes larger and larger with rules from those batches (see SPARK-39511, SPARK-22662, SPARK-36280). And now you want to add InferFiltersFromConstraints too. So I wonder if RewritePredicateSubquery is at the right place or what else would make sense to be executed after RewritePredicateSubquery? Maybe rerunning a full operatorOptimizationBatch would make sense despite it comes with a cost?

@wangyum
Copy link
Member

wangyum commented Mar 10, 2023

I had a change like this before: #22778.

@peter-toth
Copy link
Contributor

I had a change like this before: #22778.

Ah ok, thanks @wangyum! It looks like the very same discussuion has come up before: #22778 (comment)

@mskapilks
Copy link
Contributor Author

@wangyum @peter-toth Thanks for pointing on previous attempts.

It does seem moving RewritePredicateSubquery rule is right way so that in future we don't add anymore rule to that batch (RewriteSubquery).

In this pr #17520, they tried to put RewritePredicateSubquery right after Subquery batch (of OptimizeSubqueries). operatorOptimizationBatch will run after this. They also added one rule to push LeftSemi/LeftAnti through join, but that has been added in 3.0 by SPARK-19712. So now we only need to change rule position.

If this seems right to you guys, I can update this PR to move RewritePredicateSubquery after Subqury batch?

@peter-toth
Copy link
Contributor

Looks like there are a few failures after moving the rule (22e7886). @mskapilks, do you think you can look into those failures?

@mskapilks
Copy link
Contributor Author

Looks like there are a few failures after moving the rule (22e7886). @mskapilks, do you think you can look into those failures?

Yup I am working on them. I had wrong SPARK_HOME setup so missed the plan changes

@@ -1158,12 +1158,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
var joinExec = assertJoin((
"select * from testData where key not in (select a from testData2)",
classOf[BroadcastHashJoinExec]))
assert(joinExec.asInstanceOf[BroadcastHashJoinExec].isNullAwareAntiJoin)
assert(!joinExec.asInstanceOf[BroadcastHashJoinExec].isNullAwareAntiJoin)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two queries don't need NWAJ now due to more inferred filters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please ellaborate on this a bit more?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plan for this query before this change:

Join LeftAnti, ((key#13 = a#23) OR isnull((key#13 = a#23)))
:- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14]
:  +- ExternalRDD [obj#12]
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23]
   +- ExternalRDD [obj#22]

New plan

Join LeftAnti, (key#13 = a#23)
:- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14]
:  +- ExternalRDD [obj#12]
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23]
   +- ExternalRDD [obj#22]

isnull((key#13 = a#23)) condition got removed by NullPropagation rule (as now all optimization rules will run after subquery rewrite).

So now the join does get convert to Null Aware Anti Join as that's only happens when condition like previous plan exists. LeftAnti(condition: Or(EqualTo(a=b), IsNull(EqualTo(a=b))) Code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, then I think we need to fix the test query (and not the expected result) as not in can't be rewritten to a simple (not null-aware) BroadcastHashJoinExec if we don't know the key's and a's nullability. I think the problem here is that we use TestData and TestData2 where key and a are Ints and not Integers.

+- * ColumnarToRow (42)
+- Scan parquet spark_catalog.default.customer_demographics (41)
+- * Filter (46)
+- * SortMergeJoin ExistenceJoin(exists#1) (45)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems pushdown is not happening? Need to check this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems PushLeftSemiLeftAntiThroughJoin PushDownLeftSemiAntiJoin doesn't consider ExistenceJoin. Might need to update these rules or do predicate pushdown before subquery rewrite (this may not be ideal)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filter (46)'s condition is exists#2 OR exists#1, that can't be pushed down. But that's ok as it is basically the same as the old Filter (30) was.
In the new plan the order of joins are a bit different, but I'm not sure the new plan would be worse. Actually we have 3 SMJ + 5 BHJ now whereas we had 4 + 4...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please run a TPCDS benchmark to make sure we don't introduce performance regression?

@mskapilks
Copy link
Contributor Author

More failures. Seems this might take real effort to make it work like other rules modifications.

@peter-toth
Copy link
Contributor

More failures. Seems this might take real effort to make it work like other rules modifications.

Why was the latest commit (b1ed7be) needed?

@peter-toth
Copy link
Contributor

peter-toth commented Apr 21, 2023

@mskapilks, do you have any update on this? I can take over this PR and investigate the idea further if you don't have time for it.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jul 31, 2023
@github-actions github-actions bot closed this Aug 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants