Skip to content

SPARK-2566. Update ShuffleWriteMetrics incrementally #1481

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from

Conversation

sryza
Copy link
Contributor

@sryza sryza commented Jul 18, 2014

I haven't tested this out on a cluster yet, but wanted to make sure the approach (passing ShuffleWriteMetrics down to DiskBlockObjectWriter) was ok

@SparkQA
Copy link

SparkQA commented Jul 18, 2014

QA tests have started for PR 1481. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16822/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 18, 2014

QA results for PR 1481:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16822/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 18, 2014

QA tests have started for PR 1481. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16839/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 19, 2014

QA results for PR 1481:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16839/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA tests have started for PR 1481. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17696/consoleFull

@@ -567,10 +567,12 @@ private[spark] class BlockManager(
blockId: BlockId,
file: File,
serializer: Serializer,
bufferSize: Int): BlockObjectWriter = {
bufferSize: Int,
writeMetrics: Option[ShuffleWriteMetrics] = None): BlockObjectWriter = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a bit of a mismatch in this function signature, the doc says "This is currently used for writing shuffle files out.", but the shuffle write metrics are optional. I think the doc is just stale, because we now use this in other cases where we might not be writing out shuffle data, so can you just remove that line from the doc?

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA results for PR 1481:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17696/consoleFull

@pwendell
Copy link
Contributor

pwendell commented Aug 1, 2014

I'm not totally finished with this but a couple high level comments:

  1. I think this entirely subsumes some existing code that tracks timing only in order to bubble it up to the write metrics, so if that code could be removed it would be great (it will simplify things and maybe this patch can even net remove code).
  2. The fields in ShuffleWriteMetrics should be made volatile in order to ensure that they are visible when heartbeats happen. I think this should be fine from a performance perspective because at run-time the writer threads are really active and the reader threads just come by infrequently to hearbeat.

@sryza
Copy link
Contributor Author

sryza commented Aug 2, 2014

I hadn't noticed this before, but DiskObjectWriter is used for tracking bytes spilled by ExternalSorter and ExternalAppendOnlyMap in addition to shuffle bytes written. So in some cases it needs to bill to TaskMetrics.diskBytesSpilled as well. Any thoughts on the best way to handle this? The options as I see it are:

  • Pass in TaskMetrics and some flag about which metric to update to DiskObjectWriter
  • Pass in a callback that DiskObjectWriter can notify when it writes bytes
  • Pass in a MutableLong

@pwendell
Copy link
Contributor

pwendell commented Aug 3, 2014

@sryza Can't the ExeternalSorter and ExternalAppendOnlyMap just pass their own ShuffleWriteMetrics when they create a disk writer and then read back the bytes written? I think that would be nicer actually because you wouldn't have this Option thing everywhere, bascially when you create a disk writer you need to supply metrics.

We could also change the name of ShuffleWriteMetrics to just be WriteMetrics - or we could leave it for now and just put a TODO.

@pwendell
Copy link
Contributor

pwendell commented Aug 3, 2014

I guess the current name isn't that bad because technically this external aggregation does occur during a shuffle, although it's not exactly the write of the shuffle data itself. In any case I think making it not an option is a good call... hopefully it will simplify things.

@SparkQA
Copy link

SparkQA commented Aug 3, 2014

QA tests have started for PR 1481. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17827/consoleFull

@sryza
Copy link
Contributor Author

sryza commented Aug 3, 2014

Updated patch keeps it as ShuffleWriteMetrics for now.

@SparkQA
Copy link

SparkQA commented Aug 3, 2014

QA results for PR 1481:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17827/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 3, 2014

QA tests have started for PR 1481. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17828/consoleFull

@@ -89,6 +90,8 @@ class ExternalAppendOnlyMap[K, V, C](
// How much of the shared memory pool this collection has claimed
private var myMemoryThreshold = 0L

private val writeMetrics = new ShuffleWriteMetrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit strange that the same metrics object is re-used here. Can you just create a new ShuffleWriteMetrics every time you create a writer? That is more the model I was thinking of.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can continue to track _diskBytesSpilled as its own var

@pwendell
Copy link
Contributor

pwendell commented Aug 4, 2014

I tool a pass. I like the new approach, with this approach you actually net remove code from Spark, which is great. Left comments inline. This needs some unit tests as well. It shouldn't be too hard because you can create a BlockObjectWriter pretty easily (it is fairly self-contained thing). One thing that might be helpful for unit tests is adding a private flush method to BlockObjectWriter - since otherwise writes will get buffered inside of the inner streams and you might not ever see them in the resulting file if you are writing small amounts of data.

@@ -29,7 +29,7 @@ import akka.actor.{ActorSystem, Cancellable, Props}
import sun.nio.ch.DirectBuffer

import org.apache.spark._
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
import org.apache.spark.executor.{ShuffleWriteMetrics, DataReadMethod, InputMetrics}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alphabetization (here and elsewhere as well)

@sryza
Copy link
Contributor Author

sryza commented Aug 4, 2014

Updated patch addresses @pwendell and @kayousterhout 's comments and adds tests.

@SparkQA
Copy link

SparkQA commented Aug 4, 2014

QA tests have started for PR 1481. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17848/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA tests have started for PR 1481. This patch DID NOT merge cleanly!
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17964/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA tests have started for PR 1481. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17965/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA results for PR 1481:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17965/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA results for PR 1481:
- This patch FAILED unit tests.

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17964/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA tests have started for PR 1481. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18012/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA results for PR 1481:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18012/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA tests have started for PR 1481. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18027/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA results for PR 1481:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18027/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA tests have started for PR 1481. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18030/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA results for PR 1481:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18030/consoleFull

@pwendell
Copy link
Contributor

pwendell commented Aug 6, 2014

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA tests have started for PR 1481. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18039/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 6, 2014

QA results for PR 1481:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18039/consoleFull

@kayousterhout
Copy link
Contributor

woohoooo!

On Wed, Aug 6, 2014 at 1:04 PM, Apache Spark QA notifications@github.com
wrote:

QA results for PR 1481:

  • This patch PASSES unit tests.
  • This patch merges cleanly
  • This patch adds no public classes

For more information see test ouptut:

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18039/consoleFull


Reply to this email directly or view it on GitHub
#1481 (comment).

@pwendell
Copy link
Contributor

pwendell commented Aug 6, 2014

lol

@pwendell
Copy link
Contributor

pwendell commented Aug 6, 2014

Okay I'm merging this - thanks Sandy!

@sryza
Copy link
Contributor Author

sryza commented Aug 6, 2014

hahaha I am filled with jubilation

@sryza
Copy link
Contributor Author

sryza commented Aug 6, 2014

thanks Patrick

asfgit pushed a commit that referenced this pull request Aug 6, 2014
I haven't tested this out on a cluster yet, but wanted to make sure the approach (passing ShuffleWriteMetrics down to DiskBlockObjectWriter) was ok

Author: Sandy Ryza <sandy@cloudera.com>

Closes #1481 from sryza/sandy-spark-2566 and squashes the following commits:

8090d88 [Sandy Ryza] Fix ExternalSorter
b2a62ed [Sandy Ryza] Fix more test failures
8be6218 [Sandy Ryza] Fix test failures and mark a couple variables private
c5e68e5 [Sandy Ryza] SPARK-2566. Update ShuffleWriteMetrics incrementally
(cherry picked from commit 4e98236)

Signed-off-by: Patrick Wendell <pwendell@gmail.com>
@asfgit asfgit closed this in 4e98236 Aug 6, 2014
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
I haven't tested this out on a cluster yet, but wanted to make sure the approach (passing ShuffleWriteMetrics down to DiskBlockObjectWriter) was ok

Author: Sandy Ryza <sandy@cloudera.com>

Closes apache#1481 from sryza/sandy-spark-2566 and squashes the following commits:

8090d88 [Sandy Ryza] Fix ExternalSorter
b2a62ed [Sandy Ryza] Fix more test failures
8be6218 [Sandy Ryza] Fix test failures and mark a couple variables private
c5e68e5 [Sandy Ryza] SPARK-2566. Update ShuffleWriteMetrics incrementally
kazuyukitanimura pushed a commit to kazuyukitanimura/spark that referenced this pull request Aug 10, 2022
Sets boson configuration variables to true if the `BOSON` env variable is set to true.
To run the unit tests, set 
```
export BOSON=true
export BOSON_CONF_DIR=$SPARK_SRC_DIR/boson/conf
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants