-
Notifications
You must be signed in to change notification settings - Fork 28.6k
SPARK-2566. Update ShuffleWriteMetrics incrementally #1481
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
QA tests have started for PR 1481. This patch merges cleanly. |
QA results for PR 1481: |
QA tests have started for PR 1481. This patch merges cleanly. |
QA results for PR 1481: |
QA tests have started for PR 1481. This patch merges cleanly. |
@@ -567,10 +567,12 @@ private[spark] class BlockManager( | |||
blockId: BlockId, | |||
file: File, | |||
serializer: Serializer, | |||
bufferSize: Int): BlockObjectWriter = { | |||
bufferSize: Int, | |||
writeMetrics: Option[ShuffleWriteMetrics] = None): BlockObjectWriter = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a bit of a mismatch in this function signature, the doc says "This is currently used for writing shuffle files out.", but the shuffle write metrics are optional. I think the doc is just stale, because we now use this in other cases where we might not be writing out shuffle data, so can you just remove that line from the doc?
QA results for PR 1481: |
I'm not totally finished with this but a couple high level comments:
|
I hadn't noticed this before, but DiskObjectWriter is used for tracking bytes spilled by ExternalSorter and ExternalAppendOnlyMap in addition to shuffle bytes written. So in some cases it needs to bill to TaskMetrics.diskBytesSpilled as well. Any thoughts on the best way to handle this? The options as I see it are:
|
@sryza Can't the We could also change the name of |
I guess the current name isn't that bad because technically this external aggregation does occur during a shuffle, although it's not exactly the write of the shuffle data itself. In any case I think making it not an option is a good call... hopefully it will simplify things. |
QA tests have started for PR 1481. This patch merges cleanly. |
Updated patch keeps it as ShuffleWriteMetrics for now. |
QA results for PR 1481: |
QA tests have started for PR 1481. This patch merges cleanly. |
@@ -89,6 +90,8 @@ class ExternalAppendOnlyMap[K, V, C]( | |||
// How much of the shared memory pool this collection has claimed | |||
private var myMemoryThreshold = 0L | |||
|
|||
private val writeMetrics = new ShuffleWriteMetrics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit strange that the same metrics object is re-used here. Can you just create a new ShuffleWriteMetrics every time you create a writer? That is more the model I was thinking of.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can continue to track _diskBytesSpilled
as its own var
I tool a pass. I like the new approach, with this approach you actually net remove code from Spark, which is great. Left comments inline. This needs some unit tests as well. It shouldn't be too hard because you can create a |
@@ -29,7 +29,7 @@ import akka.actor.{ActorSystem, Cancellable, Props} | |||
import sun.nio.ch.DirectBuffer | |||
|
|||
import org.apache.spark._ | |||
import org.apache.spark.executor.{DataReadMethod, InputMetrics} | |||
import org.apache.spark.executor.{ShuffleWriteMetrics, DataReadMethod, InputMetrics} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alphabetization (here and elsewhere as well)
Updated patch addresses @pwendell and @kayousterhout 's comments and adds tests. |
QA tests have started for PR 1481. This patch merges cleanly. |
QA tests have started for PR 1481. This patch DID NOT merge cleanly! |
QA tests have started for PR 1481. This patch merges cleanly. |
QA results for PR 1481: |
QA results for PR 1481: |
QA tests have started for PR 1481. This patch merges cleanly. |
QA results for PR 1481: |
QA tests have started for PR 1481. This patch merges cleanly. |
QA results for PR 1481: |
QA tests have started for PR 1481. This patch merges cleanly. |
QA results for PR 1481: |
Jenkins, test this please. |
QA tests have started for PR 1481. This patch merges cleanly. |
QA results for PR 1481: |
woohoooo! On Wed, Aug 6, 2014 at 1:04 PM, Apache Spark QA notifications@github.com
|
lol |
Okay I'm merging this - thanks Sandy! |
hahaha I am filled with jubilation |
thanks Patrick |
I haven't tested this out on a cluster yet, but wanted to make sure the approach (passing ShuffleWriteMetrics down to DiskBlockObjectWriter) was ok Author: Sandy Ryza <sandy@cloudera.com> Closes #1481 from sryza/sandy-spark-2566 and squashes the following commits: 8090d88 [Sandy Ryza] Fix ExternalSorter b2a62ed [Sandy Ryza] Fix more test failures 8be6218 [Sandy Ryza] Fix test failures and mark a couple variables private c5e68e5 [Sandy Ryza] SPARK-2566. Update ShuffleWriteMetrics incrementally (cherry picked from commit 4e98236) Signed-off-by: Patrick Wendell <pwendell@gmail.com>
I haven't tested this out on a cluster yet, but wanted to make sure the approach (passing ShuffleWriteMetrics down to DiskBlockObjectWriter) was ok Author: Sandy Ryza <sandy@cloudera.com> Closes apache#1481 from sryza/sandy-spark-2566 and squashes the following commits: 8090d88 [Sandy Ryza] Fix ExternalSorter b2a62ed [Sandy Ryza] Fix more test failures 8be6218 [Sandy Ryza] Fix test failures and mark a couple variables private c5e68e5 [Sandy Ryza] SPARK-2566. Update ShuffleWriteMetrics incrementally
Sets boson configuration variables to true if the `BOSON` env variable is set to true. To run the unit tests, set ``` export BOSON=true export BOSON_CONF_DIR=$SPARK_SRC_DIR/boson/conf ```
I haven't tested this out on a cluster yet, but wanted to make sure the approach (passing ShuffleWriteMetrics down to DiskBlockObjectWriter) was ok