Skip to content

[SPARK-3796] Create external service which can serve shuffle files #3001

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from

Conversation

aarondav
Copy link
Contributor

This patch introduces the tooling necessary to construct an external shuffle service which is independent of Spark executors, and then use this service inside Spark. An example (just for the sake of this PR) of the service creation can be found in Worker, and the service itself is used by plugging in the StandaloneShuffleClient as Spark's ShuffleClient (setup in BlockManager).

This PR continues the work from #2753, which extracted out the transport layer of Spark's block transfer into an independent package within Spark. A new package was created which contains the Spark business logic necessary to retrieve the actual shuffle data, which is completely independent of the transport layer introduced in the previous patch. Similar to the transport layer, this package must not depend on Spark as we anticipate plugging this service as a lightweight process within, say, the YARN NodeManager, and do not wish to include Spark's dependencies (including Scala itself).

There are several outstanding tasks which must be complete before this PR can be merged:

  • Complete unit testing of network/shuffle package.
  • Performance and correctness testing on a real cluster.
  • Remove example service instantiation from Worker.scala.

There are even more shortcomings of this PR which should be addressed in followup patches:

  • Don't use Java serializer for RPC layer! It is not cross-version compatible.
  • Handle shuffle file cleanup for dead executors once the application terminates or the ContextCleaner triggers.
  • Documentation of the feature in the Spark docs.
  • Improve behavior if the shuffle service itself goes down (right now we don't blacklist it, and new executors cannot spawn on that machine).
  • SSL and SASL integration
  • Nice to have: Handle shuffle file consolidation (this would requires changes to Spark's implementation).

@aarondav
Copy link
Contributor Author

cc @rxin @andrewor14

@SparkQA
Copy link

SparkQA commented Oct 29, 2014

Test build #22466 has started for PR 3001 at commit 52cca65.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 29, 2014

Test build #22466 has finished for PR 3001 at commit 52cca65.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22466/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Oct 29, 2014

Test build #22467 has started for PR 3001 at commit bf6c2dd.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 29, 2014

Test build #22468 has started for PR 3001 at commit 6055df6.

  • This patch merges cleanly.

@@ -78,7 +78,7 @@ private[spark] class Executor(
val executorSource = new ExecutorSource(this, executorId)

// Initialize Spark environment (using system properties read above)
conf.set("spark.executor.id", "executor." + executorId)
conf.set("spark.executor.id", executorId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this change about?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was introduced recently, and I was planning on using it, but ended up not. Still, I was inclined to keep the seemingly more sensible semantics of "spark.executor.id" being the executorId rather than being prefixed. It is currently only used by the "MetricsSystem".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes sense. This was introduced in a patch that was merged not long ago (middle of 1.2 window) so it's OK to change it.

@SparkQA
Copy link

SparkQA commented Oct 29, 2014

Test build #22467 has finished for PR 3001 at commit bf6c2dd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22467/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Oct 29, 2014

Test build #22468 has finished for PR 3001 at commit 6055df6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22468/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Oct 29, 2014

Test build #22481 has started for PR 3001 at commit 54af871.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 29, 2014

Test build #22481 has finished for PR 3001 at commit 54af871.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22481/
Test PASSed.

port: Int,
blockIds: Seq[String],
execId: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

execId should be part of the connection establishment / registration and not part of fetchBlocks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tricky part here is that "execId" is actually part of the request. I am fetching Executor 6's blocks, while I am myself Executor 4. So there is no API that is exposed at a lower layer to transfer the execId.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ic - does each executor have its own path for shuffle files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, each executor registers its ExecutorShuffleInfo, which includes its own localDirs (created by the Executor on initialization).

@andrewor14
Copy link
Contributor

Hey @aarondav quick correction for your description. The shuffle service is intended to run in Yarn's NodeManager, not the ApplicationManager (doesn't exist). It's great that the API exposed is so narrow. I think at least for Yarn we'll need to find a way to communicate the port back to the application from the server. We might have to include this info through some metadata when an application is registered.

String execId,
ExecutorShuffleConfig executorConfig) {
String fullId = getAppExecId(appId, execId);
executors.put(fullId, executorConfig);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should log something here

@SparkQA
Copy link

SparkQA commented Nov 1, 2014

Test build #22682 has started for PR 3001 at commit 3d62679.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 1, 2014

Test build #22683 has started for PR 3001 at commit 9883918.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 1, 2014

Test build #22682 has finished for PR 3001 at commit 3d62679.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22682/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Nov 1, 2014

Test build #22683 has finished for PR 3001 at commit 9883918.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22683/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Nov 1, 2014

Test build #22687 has started for PR 3001 at commit fd3928b.

  • This patch merges cleanly.

@aarondav
Copy link
Contributor Author

aarondav commented Nov 1, 2014

@rxin Please take a look at my last commit, fd3928b. This is some critical code which handles a couple cases I describe below.

I have completed performance and correctness testing on a real cluster. Performance-wise, I saw no regression from the in-executor version. Additionally, I saw minimal memory usage from the Worker, where I put the server -- I ran several medium-sized shuffles using Workers with 512MB max heap sizes (the default) without noticeable garbage collection or heap growth.

During testing, I noticed that we were dropping map outputs if I killed an executor in the middle of a map or reduce phase (in local testing, my queries ran so quickly that I always killed the executor after the completion of the job). This caused us to unnecessarily recompute the map tasks. I have added code which only drops map outputs if (1) external shuffle is disabled or (2) we're responding to a fetch failure specifically.

@rxin
Copy link
Contributor

rxin commented Nov 1, 2014

The very last commit looks fine.

@SparkQA
Copy link

SparkQA commented Nov 1, 2014

Test build #22687 has finished for PR 3001 at commit fd3928b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22687/
Test PASSed.

@aarondav
Copy link
Contributor Author

aarondav commented Nov 1, 2014

I have renamed Standalone* to External*, removed the example instantiation in Worker.scala, and have pushed the documentation back to a later PR (as that can certainly be done during the QA period).

From my end, this PR is ready to merge.

@SparkQA
Copy link

SparkQA commented Nov 1, 2014

Test build #22697 has started for PR 3001 at commit 4d1f8c1.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 1, 2014

Test build #22697 has finished for PR 3001 at commit 4d1f8c1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22697/
Test PASSed.

@rxin
Copy link
Contributor

rxin commented Nov 1, 2014

This looks good overall. I'm going to merge it. There are some nit comments that you can address in followup PRs.

@@ -49,11 +49,11 @@
private ChannelFuture channelFuture;
private int port = -1;

public TransportServer(TransportContext context) {
public TransportServer(TransportContext context, int portToBind) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add javadoc defining the behavior for portToBind == 0

@asfgit asfgit closed this in f55218a Nov 1, 2014
@pwendell
Copy link
Contributor

pwendell commented Nov 2, 2014

Build changes LGTM - I realize this was already merged.

On Sat, Nov 1, 2014 at 2:43 PM, asfgit notifications@github.com wrote:

Closed #3001 #3001 via f55218a
f55218a
.


Reply to this email directly or view it on GitHub
#3001 (comment).

asfgit pushed a commit that referenced this pull request Nov 5, 2014
This creates a new module `network/yarn` that depends on `network/shuffle` recently created in #3001. This PR introduces a custom Yarn auxiliary service that runs the external shuffle service. As of the changes here this shuffle service is required for using dynamic allocation with Spark.

This is still WIP mainly because it doesn't handle security yet. I have tested this on a stable Yarn cluster.

Author: Andrew Or <andrew@databricks.com>

Closes #3082 from andrewor14/yarn-shuffle-service and squashes the following commits:

ef3ddae [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service
0ee67a2 [Andrew Or] Minor wording suggestions
1c66046 [Andrew Or] Remove unused provided dependencies
0eb6233 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service
6489db5 [Andrew Or] Try catch at the right places
7b71d8f [Andrew Or] Add detailed java docs + reword a few comments
d1124e4 [Andrew Or] Add security to shuffle service (INCOMPLETE)
5f8a96f [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service
9b6e058 [Andrew Or] Address various feedback
f48b20c [Andrew Or] Fix tests again
f39daa6 [Andrew Or] Do not make network-yarn an assembly module
761f58a [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service
15a5b37 [Andrew Or] Fix build for Hadoop 1.x
baff916 [Andrew Or] Fix tests
5bf9b7e [Andrew Or] Address a few minor comments
5b419b8 [Andrew Or] Add missing license header
804e7ff [Andrew Or] Include the Yarn shuffle service jar in the distribution
cd076a4 [Andrew Or] Require external shuffle service for dynamic allocation
ea764e0 [Andrew Or] Connect to Yarn shuffle service only if it's enabled
1bf5109 [Andrew Or] Use the shuffle service port specified through hadoop config
b4b1f0c [Andrew Or] 4 tabs -> 2 tabs
43dcb96 [Andrew Or] First cut integration of shuffle service with Yarn aux service
b54a0c4 [Andrew Or] Initial skeleton for Yarn shuffle service

(cherry picked from commit 61a5cce)
Signed-off-by: Andrew Or <andrew@databricks.com>
asfgit pushed a commit that referenced this pull request Nov 5, 2014
This creates a new module `network/yarn` that depends on `network/shuffle` recently created in #3001. This PR introduces a custom Yarn auxiliary service that runs the external shuffle service. As of the changes here this shuffle service is required for using dynamic allocation with Spark.

This is still WIP mainly because it doesn't handle security yet. I have tested this on a stable Yarn cluster.

Author: Andrew Or <andrew@databricks.com>

Closes #3082 from andrewor14/yarn-shuffle-service and squashes the following commits:

ef3ddae [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service
0ee67a2 [Andrew Or] Minor wording suggestions
1c66046 [Andrew Or] Remove unused provided dependencies
0eb6233 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service
6489db5 [Andrew Or] Try catch at the right places
7b71d8f [Andrew Or] Add detailed java docs + reword a few comments
d1124e4 [Andrew Or] Add security to shuffle service (INCOMPLETE)
5f8a96f [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service
9b6e058 [Andrew Or] Address various feedback
f48b20c [Andrew Or] Fix tests again
f39daa6 [Andrew Or] Do not make network-yarn an assembly module
761f58a [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service
15a5b37 [Andrew Or] Fix build for Hadoop 1.x
baff916 [Andrew Or] Fix tests
5bf9b7e [Andrew Or] Address a few minor comments
5b419b8 [Andrew Or] Add missing license header
804e7ff [Andrew Or] Include the Yarn shuffle service jar in the distribution
cd076a4 [Andrew Or] Require external shuffle service for dynamic allocation
ea764e0 [Andrew Or] Connect to Yarn shuffle service only if it's enabled
1bf5109 [Andrew Or] Use the shuffle service port specified through hadoop config
b4b1f0c [Andrew Or] 4 tabs -> 2 tabs
43dcb96 [Andrew Or] First cut integration of shuffle service with Yarn aux service
b54a0c4 [Andrew Or] Initial skeleton for Yarn shuffle service
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants