-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-26193][SQL] Implement shuffle write metrics in SQL #23207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
test this please. |
Test build #99616 has finished for PR 23207 at commit
|
Test build #99617 has finished for PR 23207 at commit
|
retest this please. |
Test build #99643 has finished for PR 23207 at commit
|
Can you share some ideas about it? IMO shuffle write metrics is hard, as an RDD can have shuffle dependencies with multiple upstream RDDs. That said, in general the shuffle write metrics should belong to the upstream RDDs. In Spark SQL, it's a little simpler, as the That said, we need to design a not-so-general shuffle write metrics API in Spark core, which will only be used in Spark SQL. |
Thanks for your reply Wenchen, there's a sketch doc assigned in JIRA:https://docs.google.com/document/d/1DX0gLkpk_NCE5MwI1_m4gnA2rLdjDkynZ02u2VWDR-8/edit
That's right and that's also what I try to do at first, logically upstream operator trigger shuffle write, and first attempt implementation is also changed SparkPlan base class to achieve this.
Yes, maybe this also the suggestion by Reynold, ShuffleExchangeExec has only one child, we can do a simplify on the implementation. But as the shuffle write metrics are updated by task inner, so the core module still need some changes. |
core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
Outdated
Show resolved
Hide resolved
Test build #99677 has finished for PR 23207 at commit
|
Test build #99676 has finished for PR 23207 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
Show resolved
Hide resolved
fcd62b3
to
cf35b9f
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
Show resolved
Hide resolved
Test build #99722 has finished for PR 23207 at commit
|
Test build #99736 has finished for PR 23207 at commit
|
@xuanyuanking can you separate the prs to rename read side metric and the write side change? |
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
Outdated
Show resolved
Hide resolved
Yea, the commit of a780b70 achieve this by adding |
Test build #99782 has finished for PR 23207 at commit
|
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
Outdated
Show resolved
Hide resolved
the code looks much cleaner now! |
Sorry for the original rush and code, I should and will pay more attention on coding clean and more discussion on optional implementation. |
Test build #99805 has finished for PR 23207 at commit
|
retest this please. |
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except a few comments. cc @rxin to take another look
Test build #99817 has finished for PR 23207 at commit
|
Test build #99825 has finished for PR 23207 at commit
|
thanks, merging to master! |
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
Show resolved
Hide resolved
Thanks all for your review. |
…anges ## What changes were proposed in this pull request? Follow up pr for #23207, include following changes: - Rename `SQLShuffleMetricsReporter` to `SQLShuffleReadMetricsReporter` to make it match with write side naming. - Display text changes for read side for naming consistent. - Rename function in `ShuffleWriteProcessor`. - Delete `private[spark]` in execution package. ## How was this patch tested? Existing tests. Closes #23286 from xuanyuanking/SPARK-26193-follow. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…anges ## What changes were proposed in this pull request? Follow up pr for apache#23207, include following changes: - Rename `SQLShuffleMetricsReporter` to `SQLShuffleReadMetricsReporter` to make it match with write side naming. - Display text changes for read side for naming consistent. - Rename function in `ShuffleWriteProcessor`. - Delete `private[spark]` in execution package. ## How was this patch tested? Existing tests. Closes apache#23286 from xuanyuanking/SPARK-26193-follow. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request? 1. Implement `SQLShuffleWriteMetricsReporter` on the SQL side as the customized `ShuffleWriteMetricsReporter`. 2. Add shuffle write metrics to `ShuffleExchangeExec`, and use these metrics to create corresponding `SQLShuffleWriteMetricsReporter` in shuffle dependency. 3. Rework on `ShuffleMapTask` to add new class named `ShuffleWriteProcessor` which control shuffle write process, we use sql shuffle write metrics by customizing a ShuffleWriteProcessor on SQL side. ## How was this patch tested? Add UT in SQLMetricsSuite. Manually test locally, update screen shot to document attached in JIRA. Closes apache#23207 from xuanyuanking/SPARK-26193. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…anges ## What changes were proposed in this pull request? Follow up pr for apache#23207, include following changes: - Rename `SQLShuffleMetricsReporter` to `SQLShuffleReadMetricsReporter` to make it match with write side naming. - Display text changes for read side for naming consistent. - Rename function in `ShuffleWriteProcessor`. - Delete `private[spark]` in execution package. ## How was this patch tested? Existing tests. Closes apache#23286 from xuanyuanking/SPARK-26193-follow. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
SQLShuffleWriteMetricsReporter
on the SQL side as the customizedShuffleWriteMetricsReporter
.ShuffleExchangeExec
, and use these metrics to create correspondingSQLShuffleWriteMetricsReporter
in shuffle dependency.ShuffleMapTask
to add new class namedShuffleWriteProcessor
which control shuffle write process, we use sql shuffle write metrics by customizing a ShuffleWriteProcessor on SQL side.How was this patch tested?
Add UT in SQLMetricsSuite.
Manually test locally, update screen shot to document attached in JIRA.