Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
b9d2ced
incomplete setup for external shuffle service tests
squito Jul 28, 2015
36127d3
wip
squito Jul 28, 2015
bb3ba49
minor cleanup
squito Jul 28, 2015
c69f46b
maybe working version, needs tests & cleanup ...
squito Jul 30, 2015
5e5a7c3
fix build
squito Jul 31, 2015
ad122ef
more fixes
squito Jul 31, 2015
0b588bd
more fixes ...
squito Jul 31, 2015
1136f44
test needs to have an actual shuffle
squito Jul 31, 2015
9eae119
cleanup lots of duplication
squito Jul 31, 2015
0e9d69b
better names
squito Jul 31, 2015
d596969
cleanup imports
squito Jul 31, 2015
dd93dc0
test for shuffle service w/ NM restarts
squito Aug 1, 2015
efb6195
proper unit test, and no longer leak if apps stop during NM restart
squito Aug 1, 2015
a36729c
cleanup
squito Aug 1, 2015
7504de8
style
squito Aug 1, 2015
23994ff
style
squito Aug 1, 2015
86e0cb9
for tests, shuffle service finds an open port
squito Aug 1, 2015
bdc4b32
rename
squito Aug 1, 2015
bb9d1e6
formatting
squito Aug 1, 2015
857331a
better tests & comments
squito Aug 1, 2015
bdbbf0d
comments, remove some unnecessary changes
squito Aug 1, 2015
62586a6
just serialize the whole executors map
squito Aug 3, 2015
245db19
style
squito Aug 3, 2015
55f49fc
make sure the service doesnt die if the registered executor file is c…
squito Aug 3, 2015
0a39b98
Merge branch 'master' into external_shuffle_service_NM_restart
squito Aug 3, 2015
4492835
lol, dont use a PrintWriter b/c of scalastyle checks
squito Aug 3, 2015
f729e2b
debugging
squito Aug 4, 2015
d7450f0
style
squito Aug 4, 2015
32fe5ae
Merge branch 'master' into external_shuffle_service_NM_restart
squito Aug 4, 2015
59800b7
Files.move in case renaming is unsupported
squito Aug 4, 2015
d596410
store executor data in leveldb
squito Aug 4, 2015
694934c
only open leveldb connection once per service
squito Aug 5, 2015
c878fbe
better explanation of shuffle service port handling
squito Aug 5, 2015
12b6a35
save registered executors when apps are removed; add tests
squito Aug 5, 2015
79922b7
rely on yarn to call stopApplication; assorted cleanup
squito Aug 6, 2015
acedb62
switch to writing out one record per executor
squito Aug 6, 2015
9378ba3
fail gracefully on corrupt leveldb files
squito Aug 6, 2015
e9f99e8
cleanup the handling of bad dbs a little
squito Aug 6, 2015
8267d2a
style
squito Aug 7, 2015
1a7980b
version
squito Aug 7, 2015
594d520
use json to serialize application executor info
squito Aug 7, 2015
81f80e2
Merge branch 'master' into leveldb_external_shuffle_service_NM_restart
squito Aug 12, 2015
795d28f
review feedback
squito Aug 12, 2015
2499c8c
explicit dependency on jackson-annotations
squito Aug 12, 2015
5c71c8c
save executor to db before registering; style
squito Aug 14, 2015
70951d6
Merge branch 'master' into leveldb_external_shuffle_service_NM_restart
squito Aug 17, 2015
0d285d3
review feedback
squito Aug 17, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana

/** Create a new shuffle block handler. Factored out for subclasses to override. */
protected def newShuffleBlockHandler(conf: TransportConf): ExternalShuffleBlockHandler = {
new ExternalShuffleBlockHandler(conf)
new ExternalShuffleBlockHandler(conf, null)
}

/** Starts the external shuffle service if the user has configured us to. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.network.util.TransportConf
* It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]].
*/
private[mesos] class MesosExternalShuffleBlockHandler(transportConf: TransportConf)
extends ExternalShuffleBlockHandler(transportConf) with Logging {
extends ExternalShuffleBlockHandler(transportConf, null) with Logging {

// Stores a map of driver socket addresses to app ids
private val connectedApps = new mutable.HashMap[SocketAddress, String]
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,17 @@ private[spark] class BlockManager(

// Port used by the external shuffle service. In Yarn mode, this may be already be
// set through the Hadoop configuration as the server is launched in the Yarn NM.
private val externalShuffleServicePort =
Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
private val externalShuffleServicePort = {
val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
if (tmpPort == 0) {
// for testing, we set "spark.shuffle.service.port" to 0 in the yarn config, so yarn finds
// an open port. But we still need to tell our spark apps the right port to use. So
// only if the yarn config has the port set to 0, we prefer the value in the spark config
conf.get("spark.shuffle.service.port").toInt
} else {
tmpPort
}
}

// Check that we're not using external shuffle service with consolidated shuffle files.
if (externalShuffleServiceEnabled
Expand Down Expand Up @@ -191,6 +200,7 @@ private[spark] class BlockManager(
executorId, blockTransferService.hostName, blockTransferService.port)

shuffleServerId = if (externalShuffleServiceEnabled) {
logInfo(s"external shuffle service port = $externalShuffleServicePort")
BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
} else {
blockManagerId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {

override def beforeAll() {
val transportConf = SparkTransportConf.fromSparkConf(conf, numUsableCores = 2)
rpcHandler = new ExternalShuffleBlockHandler(transportConf)
rpcHandler = new ExternalShuffleBlockHandler(transportConf, null)
val transportContext = new TransportContext(transportConf, rpcHandler)
server = transportContext.createServer()

Expand Down
16 changes: 16 additions & 0 deletions network/shuffle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,22 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
<version>1.8</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add the jackson-annotation module too, since you're using it? maven can get confused if transitive dependencies have a different version of these modules.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err ... actually it looks like we don't have jackson-annotations listed anywhere else. (Its excluded from a couple of other depenencies, but thats it.) Do you think its worth including just here? Should it be changed in the root pom?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to add it, especially since you're explicitly using jackson annotations in the code. (It should follow the usual pattern - declare in root pom's dependencyManagement, use wherever needed).

</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>

<!-- Provided dependencies -->
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.spark.network.shuffle;

import java.io.File;
import java.io.IOException;
import java.util.List;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.spark.network.util.TransportConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,10 +32,10 @@
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
import org.apache.spark.network.shuffle.protocol.OpenBlocks;
import org.apache.spark.network.shuffle.protocol.RegisterExecutor;
import org.apache.spark.network.shuffle.protocol.StreamHandle;
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId;
import org.apache.spark.network.shuffle.protocol.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use ._ or explicitly put them in with {}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nevermind java file not scala

import org.apache.spark.network.util.TransportConf;


/**
* RPC Handler for a server which can serve shuffle blocks from outside of an Executor process.
Expand All @@ -46,11 +47,13 @@
public class ExternalShuffleBlockHandler extends RpcHandler {
private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockHandler.class);

private final ExternalShuffleBlockResolver blockManager;
@VisibleForTesting
final ExternalShuffleBlockResolver blockManager;
private final OneForOneStreamManager streamManager;

public ExternalShuffleBlockHandler(TransportConf conf) {
this(new OneForOneStreamManager(), new ExternalShuffleBlockResolver(conf));
public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) throws IOException {
this(new OneForOneStreamManager(),
new ExternalShuffleBlockResolver(conf, registeredExecutorFile));
}

/** Enables mocking out the StreamManager and BlockManager. */
Expand Down Expand Up @@ -105,4 +108,22 @@ public StreamManager getStreamManager() {
public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
blockManager.applicationRemoved(appId, cleanupLocalDirs);
}

/**
* Register an (application, executor) with the given shuffle info.
*
* The "re-" is meant to highlight the intended use of this method -- when this service is
* restarted, this is used to restore the state of executors from before the restart. Normal
* registration will happen via a message handled in receive()
*
* @param appExecId
* @param executorInfo
*/
public void reregisterExecutor(AppExecId appExecId, ExecutorShuffleInfo executorInfo) {
blockManager.registerExecutor(appExecId.appId, appExecId.execId, executorInfo);
}

public void close() {
blockManager.close();
}
}
Loading