-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-9439] [yarn] External shuffle service robust to NM restarts using leveldb #7943
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
squito
wants to merge
47
commits into
apache:master
from
squito:leveldb_external_shuffle_service_NM_restart
Closed
Changes from all commits
Commits
Show all changes
47 commits
Select commit
Hold shift + click to select a range
b9d2ced
incomplete setup for external shuffle service tests
squito 36127d3
wip
squito bb3ba49
minor cleanup
squito c69f46b
maybe working version, needs tests & cleanup ...
squito 5e5a7c3
fix build
squito ad122ef
more fixes
squito 0b588bd
more fixes ...
squito 1136f44
test needs to have an actual shuffle
squito 9eae119
cleanup lots of duplication
squito 0e9d69b
better names
squito d596969
cleanup imports
squito dd93dc0
test for shuffle service w/ NM restarts
squito efb6195
proper unit test, and no longer leak if apps stop during NM restart
squito a36729c
cleanup
squito 7504de8
style
squito 23994ff
style
squito 86e0cb9
for tests, shuffle service finds an open port
squito bdc4b32
rename
squito bb9d1e6
formatting
squito 857331a
better tests & comments
squito bdbbf0d
comments, remove some unnecessary changes
squito 62586a6
just serialize the whole executors map
squito 245db19
style
squito 55f49fc
make sure the service doesnt die if the registered executor file is c…
squito 0a39b98
Merge branch 'master' into external_shuffle_service_NM_restart
squito 4492835
lol, dont use a PrintWriter b/c of scalastyle checks
squito f729e2b
debugging
squito d7450f0
style
squito 32fe5ae
Merge branch 'master' into external_shuffle_service_NM_restart
squito 59800b7
Files.move in case renaming is unsupported
squito d596410
store executor data in leveldb
squito 694934c
only open leveldb connection once per service
squito c878fbe
better explanation of shuffle service port handling
squito 12b6a35
save registered executors when apps are removed; add tests
squito 79922b7
rely on yarn to call stopApplication; assorted cleanup
squito acedb62
switch to writing out one record per executor
squito 9378ba3
fail gracefully on corrupt leveldb files
squito e9f99e8
cleanup the handling of bad dbs a little
squito 8267d2a
style
squito 1a7980b
version
squito 594d520
use json to serialize application executor info
squito 81f80e2
Merge branch 'master' into leveldb_external_shuffle_service_NM_restart
squito 795d28f
review feedback
squito 2499c8c
explicit dependency on jackson-annotations
squito 5c71c8c
save executor to db before registering; style
squito 70951d6
Merge branch 'master' into leveldb_external_shuffle_service_NM_restart
squito 0d285d3
review feedback
squito File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,11 +17,12 @@ | |
|
|
||
| package org.apache.spark.network.shuffle; | ||
|
|
||
| import java.io.File; | ||
| import java.io.IOException; | ||
| import java.util.List; | ||
|
|
||
| import com.google.common.annotations.VisibleForTesting; | ||
| import com.google.common.collect.Lists; | ||
| import org.apache.spark.network.util.TransportConf; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
@@ -31,10 +32,10 @@ | |
| import org.apache.spark.network.server.OneForOneStreamManager; | ||
| import org.apache.spark.network.server.RpcHandler; | ||
| import org.apache.spark.network.server.StreamManager; | ||
| import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; | ||
| import org.apache.spark.network.shuffle.protocol.OpenBlocks; | ||
| import org.apache.spark.network.shuffle.protocol.RegisterExecutor; | ||
| import org.apache.spark.network.shuffle.protocol.StreamHandle; | ||
| import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId; | ||
| import org.apache.spark.network.shuffle.protocol.*; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use ._ or explicitly put them in with {}
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nevermind java file not scala |
||
| import org.apache.spark.network.util.TransportConf; | ||
|
|
||
|
|
||
| /** | ||
| * RPC Handler for a server which can serve shuffle blocks from outside of an Executor process. | ||
|
|
@@ -46,11 +47,13 @@ | |
| public class ExternalShuffleBlockHandler extends RpcHandler { | ||
| private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockHandler.class); | ||
|
|
||
| private final ExternalShuffleBlockResolver blockManager; | ||
| @VisibleForTesting | ||
| final ExternalShuffleBlockResolver blockManager; | ||
| private final OneForOneStreamManager streamManager; | ||
|
|
||
| public ExternalShuffleBlockHandler(TransportConf conf) { | ||
| this(new OneForOneStreamManager(), new ExternalShuffleBlockResolver(conf)); | ||
| public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) throws IOException { | ||
| this(new OneForOneStreamManager(), | ||
| new ExternalShuffleBlockResolver(conf, registeredExecutorFile)); | ||
| } | ||
|
|
||
| /** Enables mocking out the StreamManager and BlockManager. */ | ||
|
|
@@ -105,4 +108,22 @@ public StreamManager getStreamManager() { | |
| public void applicationRemoved(String appId, boolean cleanupLocalDirs) { | ||
| blockManager.applicationRemoved(appId, cleanupLocalDirs); | ||
| } | ||
|
|
||
| /** | ||
| * Register an (application, executor) with the given shuffle info. | ||
| * | ||
| * The "re-" is meant to highlight the intended use of this method -- when this service is | ||
| * restarted, this is used to restore the state of executors from before the restart. Normal | ||
| * registration will happen via a message handled in receive() | ||
| * | ||
| * @param appExecId | ||
| * @param executorInfo | ||
| */ | ||
| public void reregisterExecutor(AppExecId appExecId, ExecutorShuffleInfo executorInfo) { | ||
| blockManager.registerExecutor(appExecId.appId, appExecId.execId, executorInfo); | ||
| } | ||
|
|
||
| public void close() { | ||
| blockManager.close(); | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add the
jackson-annotationmodule too, since you're using it? maven can get confused if transitive dependencies have a different version of these modules.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
err ... actually it looks like we don't have
jackson-annotationslisted anywhere else. (Its excluded from a couple of other depenencies, but thats it.) Do you think its worth including just here? Should it be changed in the root pom?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to add it, especially since you're explicitly using jackson annotations in the code. (It should follow the usual pattern - declare in root pom's dependencyManagement, use wherever needed).