-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-28148][SQL] Repartition after join is not optimized away #27096
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Jenkins, ok to test. |
Test build #116187 has finished for PR 27096 at commit
|
Thank you for making a PR, @bmarcott . |
also ping @viirya |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently EnsureRequirements
ensures children distribution/ordering satisfies parent's requirement. That said, it checks if a child's distribution/ordering is good for its parent op. It not, it adds shuffle/sort on top of the child.
The cases failed to optimize I've seen so far, are incorrect or mismatched distribution/ordering output/requirement at child/parent sides.
For this part in EnsureRequirements
:
def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
// TODO: remove this after we create a physical operator for `RepartitionByExpression`.
case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, _) =>
child.outputPartitioning match {
case lower: HashPartitioning if upper.semanticEquals(lower) => child
case _ => operator
}
}
Read from the TODO, looks like it is supposed to have physical RepartitionByExpression
that can skip shuffle if its child already satisfies the required distribution?
This PR targets a different scenario as the repartition/sort is added explicitly by the user. We have optimization at Optimizer that deals with logical plan, for example to remove Sort after sorted child plan.
So maybe EnsureRequirements
not a good place to do that optimization? Maybe we need a separate rule for that? cc @cloud-fan
(The @viirya suggestion soulds resonable to me) |
an issue is that at logical phase we don't know the physical partitioning/sorting info (e.g. SMJ), so we can't optimize the user-specified sort/repartition well at logical phase. I see 2 options here:
|
Thanks for taking a look! Yea it felt a little hacky for optimization code to be in a rule called EnsureRequirements. Although then again, the new shuffles/sorts were introduced by it. I'd like someone familiar with overall planner design to suggest whether I go through with 1st or 2nd option. |
Any more suggestions? |
can we try option 1? Seems like we need to do some experiments here. I'm not sure which option is better without seeing the concrete code. |
+1, how about add a rule named like
|
i'll update soon with the new rule |
ff573e8
to
1916ef9
Compare
I started to try including #26946, but it feels a little messy. Notice that the numPartitions of the previous shuffle are maintained, but the distribution/partitioning is changed: If we try to remove this code from EnsureRequirements, it'll create a new shuffle node with |
Test build #116740 has finished for PR 27096 at commit
|
In general, shuffles are added by Spark and Spark can pick the best # of partitions. However, for user-specified shuffles (df.repartition), the semantic should be respecting the user request and keep the # of partitions. |
Test build #116772 has finished for PR 27096 at commit
|
2517bea
to
9e895bd
Compare
updated with a new sparkplan rule and added a test which makes sure a user's repartition with a different numPartitions would not be eliminated (don't want to change expected numPartitions). Review when you get a chance :). |
Test build #116821 has finished for PR 27096 at commit
|
Test build #116820 has finished for PR 27096 at commit
|
Test build #116815 has finished for PR 27096 at commit
|
retest this please |
do the tests run post merge with master, or do I need to rebase to pick up fix #27242? |
retest this please |
Test build #117038 has finished for PR 27096 at commit
|
9e895bd
to
3ecdccb
Compare
rebased on latest master |
Test build #117198 has finished for PR 27096 at commit
|
any remaining comments/concerns? |
Test build #117211 has finished for PR 27096 at commit
|
@cloud-fan and/or others please review when convenient |
mind taking another look? |
Retest this please. |
Test build #117679 has finished for PR 27096 at commit
|
@cloud-fan @viirya |
plan.transformUp { | ||
case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, _) => | ||
child.outputPartitioning match { | ||
case lower: HashPartitioning if upper.semanticEquals(lower) => child |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why doesn't this apply to RangeParititoning
? Is it just we assume repartition()
would only do HashPartitioning
?
And what happens if someone does df1.join(df2, Seq("id"), "left").repartition(100, df("some_other_column")).repartition(20, df1("id"))
or df1.join(df2, Seq("id"), "left").sortWithinPartition(df1("some_other_column")).sortWithinPartition(df1("id"))
? We should be able to optimize that out too, right? It would be nice to make this rule more general and cover a wider range of cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for providing feedback.
Let me take a look into your specific examples and think a little more about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maryannxue
This PR focused on fixing removing unnecessary sorting and shuffling after a join, which potentially includes its own ShuffleExchangeExec with HashPartioning. Both cases you mentioned are already optimized properly: the shuffling on "some_other_column" is removed and all sortWithinPartitions are removed (due to previous optimizations in logical plan, and the optimizations introduced here)
I wouldn't mind generalizing to all Partitioning types of the ShuffleExchangeExec, but I am not sure how to compare two partitioning types for equality. You can see the special case for HashPartitioning in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maryannxue ping
bb794db
to
9db29e6
Compare
Fixed latest conflicts/rebased on latest master @cloud-fan could you take a look when you get a chance? |
Test build #121654 has finished for PR 27096 at commit
|
@cloud-fan @viirya could you help review or add a suggested reviewer here? |
@cloud-fan @viirya friendly ping |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
Extra shuffling was not eliminated after inner joins because they produce PartitioningCollection Partitioning and the current logic only matched on HashPartitioning.
Nothing was present in EnsureRequirements to eliminate parent sorting (within partitions) which was unnecessary when the same sort order was introduced by sortmergejoin
Copied from jira:
Partitioning & sorting is usually retained after join.
However here: Partitions persist through left join, sort doesn't.
Also here: Partitions do not persist though inner join.