Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2044] Pluggable interface for shuffles #1009

Closed
wants to merge 8 commits into from

Conversation

mateiz
Copy link
Contributor

@mateiz mateiz commented Jun 8, 2014

This is a first cut at moving shuffle logic behind a pluggable interface, as described at https://issues.apache.org/jira/browse/SPARK-2044, to let us more easily experiment with new shuffle implementations. It moves the existing shuffle code to a class HashShuffleManager behind a general ShuffleManager interface.

Two things are still missing to make this complete:

  • MapOutputTracker needs to be hidden behind the ShuffleManager interface; this will also require adding methods to ShuffleManager that will let the DAGScheduler interact with it as it does with the MapOutputTracker today
  • The code to do map-sides and reduce-side combine in ShuffledRDD, PairRDDFunctions, etc needs to be moved into the ShuffleManager's readers and writers

However, some of these may also be done later after we merge the current interface.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15536/

@mateiz
Copy link
Contributor Author

mateiz commented Jun 8, 2014

Jenkins, retest this please

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15537/

@jerryshao
Copy link
Contributor

Hi Matei, since we already have ShuffleWriter and ShuffleManager, do we still need to keep ShuffleBlockManager, I think two functionalities of ShuffleBlockManager as getting shuffle writer and managing shuffle block location can separately move into HashShuffleWriter and HashShuffleManager or some others. I think it will be more clear and loose-couple for the shuffle module and BlockManager.

context: TaskContext)
extends ShuffleReader[K, C]
{
require(endPartition == startPartition + 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who enforces this? I thought the API allows users to create a ShuffleReader with an arbitrary partition range, and it's up to the ShuffleReader to implement it correctly (perhaps by chunking itself up).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, this is just not yet supported in the first pass.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(The reason is that for this to work we need to move aggregation into the ShuffleReader.)

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15631/

shuffleMetrics.shuffleWriteTime = totalTime
metrics.shuffleWriteMetrics = Some(shuffleMetrics)

success = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is used anywhere?

@mateiz
Copy link
Contributor Author

mateiz commented Jun 12, 2014

Thanks Andrew for the comments. @pwendell @rxin I think this is good to merge now if it passes tests because other people want to contribute some other things (e.g. moving aggregation into the shuffle implementation) and I'd like to let them do that in parallel.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15699/

@rxin
Copy link
Contributor

rxin commented Jun 12, 2014

Ok merging this in master. Thanks!

@asfgit asfgit closed this in 508fd37 Jun 12, 2014
@mateiz
Copy link
Contributor Author

mateiz commented Jun 12, 2014

Thanks Reynold. I've now moved the other TODOs into sub-tasks on https://issues.apache.org/jira/browse/SPARK-2044.

def setSerializer(serializer: Serializer): ShuffledRDD[K, V, P] = {
this.serializer = serializer
this.serializer = Option(serializer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be Some(serializer)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not if serializer is null

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for null it will be None, isn't that the expect result?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala> var serializer: Option[java.lang.Integer] = null
serializer: Option[Integer] = null

scala> serializer = Some(1)
serializer: Option[Integer] = Some(1)

scala> serializer.get
res0: Integer = 1

scala> serializer = Some(null)
serializer: Option[Integer] = Some(null)

scala> serializer.get
res1: Integer = null

scala> serializer.isDefined
res2: Boolean = true

scala> serializer = Option(null)
serializer: Option[Integer] = None

scala> serializer.isDefined
res3: Boolean = false

pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
This is a first cut at moving shuffle logic behind a pluggable interface, as described at https://issues.apache.org/jira/browse/SPARK-2044, to let us more easily experiment with new shuffle implementations. It moves the existing shuffle code to a class HashShuffleManager behind a general ShuffleManager interface.

Two things are still missing to make this complete:
* MapOutputTracker needs to be hidden behind the ShuffleManager interface; this will also require adding methods to ShuffleManager that will let the DAGScheduler interact with it as it does with the MapOutputTracker today
* The code to do map-sides and reduce-side combine in ShuffledRDD, PairRDDFunctions, etc needs to be moved into the ShuffleManager's readers and writers

However, some of these may also be done later after we merge the current interface.

Author: Matei Zaharia <matei@databricks.com>

Closes apache#1009 from mateiz/pluggable-shuffle and squashes the following commits:

7a09862 [Matei Zaharia] review comments
be33d3f [Matei Zaharia] review comments
1513d4e [Matei Zaharia] Add ASF header
ac56831 [Matei Zaharia] Bug fix and better error message
4f681ba [Matei Zaharia] Move write part of ShuffleMapTask to ShuffleManager
f6f011d [Matei Zaharia] Move hash shuffle reader behind ShuffleManager interface
55c7717 [Matei Zaharia] Changed RDD code to use ShuffleReader
75cc044 [Matei Zaharia] Partial work to move hash shuffle in
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
This is a first cut at moving shuffle logic behind a pluggable interface, as described at https://issues.apache.org/jira/browse/SPARK-2044, to let us more easily experiment with new shuffle implementations. It moves the existing shuffle code to a class HashShuffleManager behind a general ShuffleManager interface.

Two things are still missing to make this complete:
* MapOutputTracker needs to be hidden behind the ShuffleManager interface; this will also require adding methods to ShuffleManager that will let the DAGScheduler interact with it as it does with the MapOutputTracker today
* The code to do map-sides and reduce-side combine in ShuffledRDD, PairRDDFunctions, etc needs to be moved into the ShuffleManager's readers and writers

However, some of these may also be done later after we merge the current interface.

Author: Matei Zaharia <matei@databricks.com>

Closes apache#1009 from mateiz/pluggable-shuffle and squashes the following commits:

7a09862 [Matei Zaharia] review comments
be33d3f [Matei Zaharia] review comments
1513d4e [Matei Zaharia] Add ASF header
ac56831 [Matei Zaharia] Bug fix and better error message
4f681ba [Matei Zaharia] Move write part of ShuffleMapTask to ShuffleManager
f6f011d [Matei Zaharia] Move hash shuffle reader behind ShuffleManager interface
55c7717 [Matei Zaharia] Changed RDD code to use ShuffleReader
75cc044 [Matei Zaharia] Partial work to move hash shuffle in
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants