-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-3796] Create external service which can serve shuffle files #3001
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @rxin @andrewor14 |
Test build #22466 has started for PR 3001 at commit
|
Test build #22466 has finished for PR 3001 at commit
|
Test FAILed. |
Test build #22467 has started for PR 3001 at commit
|
Test build #22468 has started for PR 3001 at commit
|
@@ -78,7 +78,7 @@ private[spark] class Executor( | |||
val executorSource = new ExecutorSource(this, executorId) | |||
|
|||
// Initialize Spark environment (using system properties read above) | |||
conf.set("spark.executor.id", "executor." + executorId) | |||
conf.set("spark.executor.id", executorId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this change about?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was introduced recently, and I was planning on using it, but ended up not. Still, I was inclined to keep the seemingly more sensible semantics of "spark.executor.id" being the executorId rather than being prefixed. It is currently only used by the "MetricsSystem".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that makes sense. This was introduced in a patch that was merged not long ago (middle of 1.2 window) so it's OK to change it.
Test build #22467 has finished for PR 3001 at commit
|
Test FAILed. |
Test build #22468 has finished for PR 3001 at commit
|
Test FAILed. |
Test build #22481 has started for PR 3001 at commit
|
Test build #22481 has finished for PR 3001 at commit
|
Test PASSed. |
port: Int, | ||
blockIds: Seq[String], | ||
execId: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
execId should be part of the connection establishment / registration and not part of fetchBlocks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tricky part here is that "execId" is actually part of the request. I am fetching Executor 6's blocks, while I am myself Executor 4. So there is no API that is exposed at a lower layer to transfer the execId.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ic - does each executor have its own path for shuffle files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, each executor registers its ExecutorShuffleInfo, which includes its own localDirs (created by the Executor on initialization).
Hey @aarondav quick correction for your description. The shuffle service is intended to run in Yarn's NodeManager, not the ApplicationManager (doesn't exist). It's great that the API exposed is so narrow. I think at least for Yarn we'll need to find a way to communicate the port back to the application from the server. We might have to include this info through some metadata when an application is registered. |
String execId, | ||
ExecutorShuffleConfig executorConfig) { | ||
String fullId = getAppExecId(appId, execId); | ||
executors.put(fullId, executorConfig); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should log something here
Test build #22682 has started for PR 3001 at commit
|
Test build #22683 has started for PR 3001 at commit
|
Test build #22682 has finished for PR 3001 at commit
|
Test PASSed. |
Test build #22683 has finished for PR 3001 at commit
|
Test PASSed. |
Test build #22687 has started for PR 3001 at commit
|
@rxin Please take a look at my last commit, fd3928b. This is some critical code which handles a couple cases I describe below. I have completed performance and correctness testing on a real cluster. Performance-wise, I saw no regression from the in-executor version. Additionally, I saw minimal memory usage from the Worker, where I put the server -- I ran several medium-sized shuffles using Workers with 512MB max heap sizes (the default) without noticeable garbage collection or heap growth. During testing, I noticed that we were dropping map outputs if I killed an executor in the middle of a map or reduce phase (in local testing, my queries ran so quickly that I always killed the executor after the completion of the job). This caused us to unnecessarily recompute the map tasks. I have added code which only drops map outputs if (1) external shuffle is disabled or (2) we're responding to a fetch failure specifically. |
The very last commit looks fine. |
Test build #22687 has finished for PR 3001 at commit
|
Test PASSed. |
I have renamed Standalone* to External*, removed the example instantiation in Worker.scala, and have pushed the documentation back to a later PR (as that can certainly be done during the QA period). From my end, this PR is ready to merge. |
Test build #22697 has started for PR 3001 at commit
|
Test build #22697 has finished for PR 3001 at commit
|
Test PASSed. |
This looks good overall. I'm going to merge it. There are some nit comments that you can address in followup PRs. |
@@ -49,11 +49,11 @@ | |||
private ChannelFuture channelFuture; | |||
private int port = -1; | |||
|
|||
public TransportServer(TransportContext context) { | |||
public TransportServer(TransportContext context, int portToBind) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add javadoc defining the behavior for portToBind == 0
Build changes LGTM - I realize this was already merged. On Sat, Nov 1, 2014 at 2:43 PM, asfgit notifications@github.com wrote:
|
This creates a new module `network/yarn` that depends on `network/shuffle` recently created in #3001. This PR introduces a custom Yarn auxiliary service that runs the external shuffle service. As of the changes here this shuffle service is required for using dynamic allocation with Spark. This is still WIP mainly because it doesn't handle security yet. I have tested this on a stable Yarn cluster. Author: Andrew Or <andrew@databricks.com> Closes #3082 from andrewor14/yarn-shuffle-service and squashes the following commits: ef3ddae [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service 0ee67a2 [Andrew Or] Minor wording suggestions 1c66046 [Andrew Or] Remove unused provided dependencies 0eb6233 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service 6489db5 [Andrew Or] Try catch at the right places 7b71d8f [Andrew Or] Add detailed java docs + reword a few comments d1124e4 [Andrew Or] Add security to shuffle service (INCOMPLETE) 5f8a96f [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service 9b6e058 [Andrew Or] Address various feedback f48b20c [Andrew Or] Fix tests again f39daa6 [Andrew Or] Do not make network-yarn an assembly module 761f58a [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service 15a5b37 [Andrew Or] Fix build for Hadoop 1.x baff916 [Andrew Or] Fix tests 5bf9b7e [Andrew Or] Address a few minor comments 5b419b8 [Andrew Or] Add missing license header 804e7ff [Andrew Or] Include the Yarn shuffle service jar in the distribution cd076a4 [Andrew Or] Require external shuffle service for dynamic allocation ea764e0 [Andrew Or] Connect to Yarn shuffle service only if it's enabled 1bf5109 [Andrew Or] Use the shuffle service port specified through hadoop config b4b1f0c [Andrew Or] 4 tabs -> 2 tabs 43dcb96 [Andrew Or] First cut integration of shuffle service with Yarn aux service b54a0c4 [Andrew Or] Initial skeleton for Yarn shuffle service (cherry picked from commit 61a5cce) Signed-off-by: Andrew Or <andrew@databricks.com>
This creates a new module `network/yarn` that depends on `network/shuffle` recently created in #3001. This PR introduces a custom Yarn auxiliary service that runs the external shuffle service. As of the changes here this shuffle service is required for using dynamic allocation with Spark. This is still WIP mainly because it doesn't handle security yet. I have tested this on a stable Yarn cluster. Author: Andrew Or <andrew@databricks.com> Closes #3082 from andrewor14/yarn-shuffle-service and squashes the following commits: ef3ddae [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service 0ee67a2 [Andrew Or] Minor wording suggestions 1c66046 [Andrew Or] Remove unused provided dependencies 0eb6233 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service 6489db5 [Andrew Or] Try catch at the right places 7b71d8f [Andrew Or] Add detailed java docs + reword a few comments d1124e4 [Andrew Or] Add security to shuffle service (INCOMPLETE) 5f8a96f [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service 9b6e058 [Andrew Or] Address various feedback f48b20c [Andrew Or] Fix tests again f39daa6 [Andrew Or] Do not make network-yarn an assembly module 761f58a [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service 15a5b37 [Andrew Or] Fix build for Hadoop 1.x baff916 [Andrew Or] Fix tests 5bf9b7e [Andrew Or] Address a few minor comments 5b419b8 [Andrew Or] Add missing license header 804e7ff [Andrew Or] Include the Yarn shuffle service jar in the distribution cd076a4 [Andrew Or] Require external shuffle service for dynamic allocation ea764e0 [Andrew Or] Connect to Yarn shuffle service only if it's enabled 1bf5109 [Andrew Or] Use the shuffle service port specified through hadoop config b4b1f0c [Andrew Or] 4 tabs -> 2 tabs 43dcb96 [Andrew Or] First cut integration of shuffle service with Yarn aux service b54a0c4 [Andrew Or] Initial skeleton for Yarn shuffle service
This patch introduces the tooling necessary to construct an external shuffle service which is independent of Spark executors, and then use this service inside Spark. An example (just for the sake of this PR) of the service creation can be found in Worker, and the service itself is used by plugging in the StandaloneShuffleClient as Spark's ShuffleClient (setup in BlockManager).
This PR continues the work from #2753, which extracted out the transport layer of Spark's block transfer into an independent package within Spark. A new package was created which contains the Spark business logic necessary to retrieve the actual shuffle data, which is completely independent of the transport layer introduced in the previous patch. Similar to the transport layer, this package must not depend on Spark as we anticipate plugging this service as a lightweight process within, say, the YARN NodeManager, and do not wish to include Spark's dependencies (including Scala itself).
There are several outstanding tasks which must be complete before this PR can be merged:
There are even more shortcomings of this PR which should be addressed in followup patches: