-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2044] Pluggable interface for shuffles #1009
Conversation
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15536/ |
Jenkins, retest this please |
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15537/ |
Hi Matei, since we already have |
context: TaskContext) | ||
extends ShuffleReader[K, C] | ||
{ | ||
require(endPartition == startPartition + 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Who enforces this? I thought the API allows users to create a ShuffleReader with an arbitrary partition range, and it's up to the ShuffleReader to implement it correctly (perhaps by chunking itself up).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, this is just not yet supported in the first pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(The reason is that for this to work we need to move aggregation into the ShuffleReader.)
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15631/ |
shuffleMetrics.shuffleWriteTime = totalTime | ||
metrics.shuffleWriteMetrics = Some(shuffleMetrics) | ||
|
||
success = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is used anywhere?
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
Ok merging this in master. Thanks! |
Thanks Reynold. I've now moved the other TODOs into sub-tasks on https://issues.apache.org/jira/browse/SPARK-2044. |
def setSerializer(serializer: Serializer): ShuffledRDD[K, V, P] = { | ||
this.serializer = serializer | ||
this.serializer = Option(serializer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be Some(serializer)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not if serializer is null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for null it will be None, isn't that the expect result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scala> var serializer: Option[java.lang.Integer] = null
serializer: Option[Integer] = null
scala> serializer = Some(1)
serializer: Option[Integer] = Some(1)
scala> serializer.get
res0: Integer = 1
scala> serializer = Some(null)
serializer: Option[Integer] = Some(null)
scala> serializer.get
res1: Integer = null
scala> serializer.isDefined
res2: Boolean = true
scala> serializer = Option(null)
serializer: Option[Integer] = None
scala> serializer.isDefined
res3: Boolean = false
This is a first cut at moving shuffle logic behind a pluggable interface, as described at https://issues.apache.org/jira/browse/SPARK-2044, to let us more easily experiment with new shuffle implementations. It moves the existing shuffle code to a class HashShuffleManager behind a general ShuffleManager interface. Two things are still missing to make this complete: * MapOutputTracker needs to be hidden behind the ShuffleManager interface; this will also require adding methods to ShuffleManager that will let the DAGScheduler interact with it as it does with the MapOutputTracker today * The code to do map-sides and reduce-side combine in ShuffledRDD, PairRDDFunctions, etc needs to be moved into the ShuffleManager's readers and writers However, some of these may also be done later after we merge the current interface. Author: Matei Zaharia <matei@databricks.com> Closes apache#1009 from mateiz/pluggable-shuffle and squashes the following commits: 7a09862 [Matei Zaharia] review comments be33d3f [Matei Zaharia] review comments 1513d4e [Matei Zaharia] Add ASF header ac56831 [Matei Zaharia] Bug fix and better error message 4f681ba [Matei Zaharia] Move write part of ShuffleMapTask to ShuffleManager f6f011d [Matei Zaharia] Move hash shuffle reader behind ShuffleManager interface 55c7717 [Matei Zaharia] Changed RDD code to use ShuffleReader 75cc044 [Matei Zaharia] Partial work to move hash shuffle in
This is a first cut at moving shuffle logic behind a pluggable interface, as described at https://issues.apache.org/jira/browse/SPARK-2044, to let us more easily experiment with new shuffle implementations. It moves the existing shuffle code to a class HashShuffleManager behind a general ShuffleManager interface. Two things are still missing to make this complete: * MapOutputTracker needs to be hidden behind the ShuffleManager interface; this will also require adding methods to ShuffleManager that will let the DAGScheduler interact with it as it does with the MapOutputTracker today * The code to do map-sides and reduce-side combine in ShuffledRDD, PairRDDFunctions, etc needs to be moved into the ShuffleManager's readers and writers However, some of these may also be done later after we merge the current interface. Author: Matei Zaharia <matei@databricks.com> Closes apache#1009 from mateiz/pluggable-shuffle and squashes the following commits: 7a09862 [Matei Zaharia] review comments be33d3f [Matei Zaharia] review comments 1513d4e [Matei Zaharia] Add ASF header ac56831 [Matei Zaharia] Bug fix and better error message 4f681ba [Matei Zaharia] Move write part of ShuffleMapTask to ShuffleManager f6f011d [Matei Zaharia] Move hash shuffle reader behind ShuffleManager interface 55c7717 [Matei Zaharia] Changed RDD code to use ShuffleReader 75cc044 [Matei Zaharia] Partial work to move hash shuffle in
This is a first cut at moving shuffle logic behind a pluggable interface, as described at https://issues.apache.org/jira/browse/SPARK-2044, to let us more easily experiment with new shuffle implementations. It moves the existing shuffle code to a class HashShuffleManager behind a general ShuffleManager interface.
Two things are still missing to make this complete:
However, some of these may also be done later after we merge the current interface.