Skip to content

[SPARK-28148][SQL] Repartition after join is not optimized away #27096

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

bmarcott
Copy link
Contributor

@bmarcott bmarcott commented Jan 5, 2020

What changes were proposed in this pull request?

Extra shuffling was not eliminated after inner joins because they produce PartitioningCollection Partitioning and the current logic only matched on HashPartitioning.

Nothing was present in EnsureRequirements to eliminate parent sorting (within partitions) which was unnecessary when the same sort order was introduced by sortmergejoin

Copied from jira:
Partitioning & sorting is usually retained after join.

spark.conf.set('spark.sql.shuffle.partitions', '42')

df1 = spark.range(5000000, numPartitions=5)
df2 = spark.range(10000000, numPartitions=5)
df3 = spark.range(20000000, numPartitions=5)

# Reuse previous partitions & sort.
df1.join(df2, on='id').join(df3, on='id').explain()
# == Physical Plan ==
# *(8) Project [id#367L]
# +- *(8) SortMergeJoin [id#367L], [id#374L], Inner
#    :- *(5) Project [id#367L]
#    :  +- *(5) SortMergeJoin [id#367L], [id#369L], Inner
#    :     :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0
#    :     :  +- Exchange hashpartitioning(id#367L, 42)
#    :     :     +- *(1) Range (0, 5000000, step=1, splits=5)
#    :     +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0
#    :        +- Exchange hashpartitioning(id#369L, 42)
#    :           +- *(3) Range (0, 10000000, step=1, splits=5)
#    +- *(7) Sort [id#374L ASC NULLS FIRST], false, 0
#       +- Exchange hashpartitioning(id#374L, 42)
#          +- *(6) Range (0, 20000000, step=1, splits=5)

However here: Partitions persist through left join, sort doesn't.

df1.join(df2, on='id', how='left').repartition('id').sortWithinPartitions('id').explain()
# == Physical Plan ==
# *(5) Sort [id#367L ASC NULLS FIRST], false, 0
# +- *(5) Project [id#367L]
#    +- SortMergeJoin [id#367L], [id#369L], LeftOuter
#       :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0
#       :  +- Exchange hashpartitioning(id#367L, 42)
#       :     +- *(1) Range (0, 5000000, step=1, splits=5)
#       +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0
#          +- Exchange hashpartitioning(id#369L, 42)
#             +- *(3) Range (0, 10000000, step=1, splits=5)

Also here: Partitions do not persist though inner join.

df1.join(df2, on='id').repartition('id').sortWithinPartitions('id').explain()
# == Physical Plan ==
# *(6) Sort [id#367L ASC NULLS FIRST], false, 0
# +- Exchange hashpartitioning(id#367L, 42)
#    +- *(5) Project [id#367L]
#       +- *(5) SortMergeJoin [id#367L], [id#369L], Inner
#          :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0
#          :  +- Exchange hashpartitioning(id#367L, 42)
#          :     +- *(1) Range (0, 5000000, step=1, splits=5)
#          +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0
#             +- Exchange hashpartitioning(id#369L, 42)
#                +- *(3) Range (0, 10000000, step=1, splits=5)

@bmarcott bmarcott changed the title SPARK-28148: repartition after join is not optimized away [SPARK-28148] [CORE]: repartition after join is not optimized away Jan 5, 2020
@bmarcott bmarcott changed the title [SPARK-28148] [CORE]: repartition after join is not optimized away [SPARK-28148][CORE]: repartition after join is not optimized away Jan 5, 2020
@dbtsai
Copy link
Member

dbtsai commented Jan 6, 2020

Jenkins, ok to test.

@dbtsai dbtsai changed the title [SPARK-28148][CORE]: repartition after join is not optimized away [SPARK-28148][SQL] Repartition after join is not optimized away Jan 6, 2020
@SparkQA
Copy link

SparkQA commented Jan 7, 2020

Test build #116187 has finished for PR 27096 at commit ff573e8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Thank you for making a PR, @bmarcott .

@dbtsai
Copy link
Member

dbtsai commented Jan 7, 2020

also ping @viirya

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently EnsureRequirements ensures children distribution/ordering satisfies parent's requirement. That said, it checks if a child's distribution/ordering is good for its parent op. It not, it adds shuffle/sort on top of the child.

The cases failed to optimize I've seen so far, are incorrect or mismatched distribution/ordering output/requirement at child/parent sides.

For this part in EnsureRequirements:

def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
  // TODO: remove this after we create a physical operator for `RepartitionByExpression`.
  case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, _) =>
    child.outputPartitioning match {
      case lower: HashPartitioning if upper.semanticEquals(lower) => child
      case _ => operator
    }
}

Read from the TODO, looks like it is supposed to have physical RepartitionByExpression that can skip shuffle if its child already satisfies the required distribution?

This PR targets a different scenario as the repartition/sort is added explicitly by the user. We have optimization at Optimizer that deals with logical plan, for example to remove Sort after sorted child plan.

So maybe EnsureRequirements not a good place to do that optimization? Maybe we need a separate rule for that? cc @cloud-fan

@maropu
Copy link
Member

maropu commented Jan 9, 2020

(The @viirya suggestion soulds resonable to me)

@cloud-fan
Copy link
Contributor

an issue is that at logical phase we don't know the physical partitioning/sorting info (e.g. SMJ), so we can't optimize the user-specified sort/repartition well at logical phase.

I see 2 options here:

  1. run an extra physical rule after EnsureRequirements, to remove useless shuffles
  2. do as the TODO says, create a new physical node for RepartitionByExpression then we can go through the main logic in the ensureDistributionAndOrdering method, instead of a small hack in the apply method.

@bmarcott
Copy link
Contributor Author

bmarcott commented Jan 9, 2020

Thanks for taking a look!
Yes, the reason it is here is because the shuffle/sorting is introduced by EnsureRequirements itself, making the user added sorts/shuffles unnecessary.

Yea it felt a little hacky for optimization code to be in a rule called EnsureRequirements. Although then again, the new shuffles/sorts were introduced by it.

I'd like someone familiar with overall planner design to suggest whether I go through with 1st or 2nd option.
For 2nd option, won't I need to create a new physical node for both the repartition and sort, each of which is kinda a dummy physical node which relies on EnsureRequirements to add the necessary sorts/partitioning based on requiredChildDistribution and requiredChildOrdering

@bmarcott
Copy link
Contributor Author

Any more suggestions?
I'm starting to think the new physical node is a bit of a hack as well just to reuse the code in ensureDistributionAndOrdering. We wouldn't want this "no op" SparkPlan to show up in the plan right?

@cloud-fan
Copy link
Contributor

can we try option 1? Seems like we need to do some experiments here. I'm not sure which option is better without seeing the concrete code.

@ulysses-you
Copy link
Contributor

+1, how about add a rule named like PruneShuffleAndSort.
At least include:

// TODO: remove this after we create a physical operator for `RepartitionByExpression`.
case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, _) =>
  child.outputPartitioning match {
    case lower: HashPartitioning if upper.semanticEquals(lower) => child
    case _ => operator
  }
  1. this pr code

  2. PR-26946

@bmarcott
Copy link
Contributor Author

bmarcott commented Jan 14, 2020

i'll update soon with the new rule PruneShuffleAndSort
I'm iffy on including PR-26946.
EnsureRequirements should probably only introduce shuffles/ordering if it needs to, rather than adding blindly and relying on later rule to prune. Or maybe it is better to include so it'll apply to any case (haven't thought deeply on the cases here)?

@bmarcott bmarcott force-pushed the nmarcott-reuse-sort-order branch from ff573e8 to 1916ef9 Compare January 15, 2020 02:55
@bmarcott
Copy link
Contributor Author

bmarcott commented Jan 15, 2020

I started to try including #26946, but it feels a little messy.

Notice that the numPartitions of the previous shuffle are maintained, but the distribution/partitioning is changed:
case (ShuffleExchangeExec(partitioning, child, _), distribution: OrderedDistribution) => ShuffleExchangeExec(distribution.createPartitioning(partitioning.numPartitions), child)

If we try to remove this code from EnsureRequirements, it'll create a new shuffle node with defaultNumPreShufflePartitions. I'm not sure it makes sense to have a general rule that if we have a shuffle with range partitioning with another shuffle as child, we eliminate the child but reuse it's numpartitions.

@SparkQA
Copy link

SparkQA commented Jan 15, 2020

Test build #116740 has finished for PR 27096 at commit 1916ef9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class PruneShuffleAndSort() extends Rule[SparkPlan]

@cloud-fan
Copy link
Contributor

In general, shuffles are added by Spark and Spark can pick the best # of partitions. However, for user-specified shuffles (df.repartition), the semantic should be respecting the user request and keep the # of partitions.

@SparkQA
Copy link

SparkQA commented Jan 15, 2020

Test build #116772 has finished for PR 27096 at commit 85003be.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@bmarcott bmarcott force-pushed the nmarcott-reuse-sort-order branch from 2517bea to 9e895bd Compare January 16, 2020 06:13
@bmarcott
Copy link
Contributor Author

updated with a new sparkplan rule and added a test which makes sure a user's repartition with a different numPartitions would not be eliminated (don't want to change expected numPartitions).

Review when you get a chance :).

@SparkQA
Copy link

SparkQA commented Jan 16, 2020

Test build #116821 has finished for PR 27096 at commit 9e895bd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 16, 2020

Test build #116820 has finished for PR 27096 at commit 2517bea.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 16, 2020

Test build #116815 has finished for PR 27096 at commit 00b082a.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@bmarcott
Copy link
Contributor Author

retest this please

@bmarcott
Copy link
Contributor Author

do the tests run post merge with master, or do I need to rebase to pick up fix #27242?

@maropu
Copy link
Member

maropu commented Jan 20, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jan 20, 2020

Test build #117038 has finished for PR 27096 at commit 9e895bd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@bmarcott bmarcott force-pushed the nmarcott-reuse-sort-order branch from 9e895bd to 3ecdccb Compare January 21, 2020 19:04
@bmarcott
Copy link
Contributor Author

bmarcott commented Jan 21, 2020

rebased on latest master

@SparkQA
Copy link

SparkQA commented Jan 21, 2020

Test build #117198 has finished for PR 27096 at commit 3ecdccb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@bmarcott
Copy link
Contributor Author

bmarcott commented Jan 22, 2020

any remaining comments/concerns?

@SparkQA
Copy link

SparkQA commented Jan 22, 2020

Test build #117211 has finished for PR 27096 at commit bb794db.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@bmarcott
Copy link
Contributor Author

@cloud-fan and/or others please review when convenient

@bmarcott
Copy link
Contributor Author

mind taking another look?
@cloud-fan
@viirya

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Jan 31, 2020

Test build #117679 has finished for PR 27096 at commit bb794db.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@bmarcott
Copy link
Contributor Author

@cloud-fan @viirya
bump 😃

plan.transformUp {
case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, _) =>
child.outputPartitioning match {
case lower: HashPartitioning if upper.semanticEquals(lower) => child
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't this apply to RangeParititoning? Is it just we assume repartition() would only do HashPartitioning?
And what happens if someone does df1.join(df2, Seq("id"), "left").repartition(100, df("some_other_column")).repartition(20, df1("id")) or df1.join(df2, Seq("id"), "left").sortWithinPartition(df1("some_other_column")).sortWithinPartition(df1("id")) ? We should be able to optimize that out too, right? It would be nice to make this rule more general and cover a wider range of cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for providing feedback.
Let me take a look into your specific examples and think a little more about it.

Copy link
Contributor Author

@bmarcott bmarcott Feb 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maryannxue
This PR focused on fixing removing unnecessary sorting and shuffling after a join, which potentially includes its own ShuffleExchangeExec with HashPartioning. Both cases you mentioned are already optimized properly: the shuffling on "some_other_column" is removed and all sortWithinPartitions are removed (due to previous optimizations in logical plan, and the optimizations introduced here)

I wouldn't mind generalizing to all Partitioning types of the ShuffleExchangeExec, but I am not sure how to compare two partitioning types for equality. You can see the special case for HashPartitioning in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bmarcott bmarcott force-pushed the nmarcott-reuse-sort-order branch from bb794db to 9db29e6 Compare April 23, 2020 06:11
@bmarcott
Copy link
Contributor Author

Fixed latest conflicts/rebased on latest master

@cloud-fan could you take a look when you get a chance?

@SparkQA
Copy link

SparkQA commented Apr 23, 2020

Test build #121654 has finished for PR 27096 at commit 9db29e6.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@bmarcott
Copy link
Contributor Author

bmarcott commented Jun 5, 2020

@cloud-fan @viirya could you help review or add a suggested reviewer here?

@bmarcott
Copy link
Contributor Author

@cloud-fan @viirya friendly ping

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants