Skip to content

Commit

Permalink
[SPARK-12130] Replace shuffleManagerClass with shortShuffleMgrNames i…
Browse files Browse the repository at this point in the history
…n ExternalShuffleBlockResolver

Replace shuffleManagerClassName with shortShuffleMgrName is  to reduce time of string's comparison. and put sort's comparison on the front. cc JoshRosen andrewor14

Author: Lianhui Wang <lianhuiwang09@gmail.com>

Closes #10131 from lianhuiwang/spark-12130.
  • Loading branch information
lianhuiwang authored and Andrew Or committed Dec 16, 2015
1 parent f725b2e commit 369127f
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ import org.apache.spark.{TaskContext, ShuffleDependency}
* boolean isDriver as parameters.
*/
private[spark] trait ShuffleManager {

/** Return short name for the ShuffleManager */
val shortName: String

/**
* Register a shuffle with the manager and obtain a handle for it to pass to tasks.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager

private val fileShuffleBlockResolver = new FileShuffleBlockResolver(conf)

override val shortName: String = "hash"

/* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */
override def registerShuffle[K, V, C](
shuffleId: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
*/
private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]()

override val shortName: String = "sort"

override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ private[spark] class BlockManager(
val shuffleConfig = new ExecutorShuffleInfo(
diskBlockManager.localDirs.map(_.toString),
diskBlockManager.subDirsPerLocalDir,
shuffleManager.getClass.getName)
shuffleManager.shortName)

val MAX_ATTEMPTS = 3
val SLEEP_TIME_SECS = 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,10 @@ public ManagedBuffer getBlockData(String appId, String execId, String blockId) {
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
}

if ("org.apache.spark.shuffle.hash.HashShuffleManager".equals(executor.shuffleManager)) {
return getHashBasedShuffleBlockData(executor, blockId);
} else if ("org.apache.spark.shuffle.sort.SortShuffleManager".equals(executor.shuffleManager)
|| "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager".equals(executor.shuffleManager)) {
if ("sort".equals(executor.shuffleManager) || "tungsten-sort".equals(executor.shuffleManager)) {
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
} else if ("hash".equals(executor.shuffleManager)) {
return getHashBasedShuffleBlockData(executor, blockId);
} else {
throw new UnsupportedOperationException(
"Unsupported shuffle manager: " + executor.shuffleManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,7 @@ public synchronized void onBlockFetchFailure(String blockId, Throwable t) {

// Register an executor so that the next steps work.
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(
new String[] { System.getProperty("java.io.tmpdir") }, 1,
"org.apache.spark.shuffle.sort.SortShuffleManager");
new String[] { System.getProperty("java.io.tmpdir") }, 1, "sort");
RegisterExecutor regmsg = new RegisterExecutor("app-1", "0", executorInfo);
client1.sendRpcSync(regmsg.toByteBuffer(), TIMEOUT_MS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void testBadRequests() throws IOException {

// Nonexistent shuffle block
resolver.registerExecutor("app0", "exec3",
dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
dataContext.createExecutorInfo("sort"));
try {
resolver.getBlockData("app0", "exec3", "shuffle_1_1_0");
fail("Should have failed");
Expand All @@ -96,7 +96,7 @@ public void testBadRequests() throws IOException {
public void testSortShuffleBlocks() throws IOException {
ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
resolver.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
dataContext.createExecutorInfo("sort"));

InputStream block0Stream =
resolver.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream();
Expand All @@ -115,7 +115,7 @@ public void testSortShuffleBlocks() throws IOException {
public void testHashShuffleBlocks() throws IOException {
ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
resolver.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager"));
dataContext.createExecutorInfo("hash"));

InputStream block0Stream =
resolver.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
public class ExternalShuffleIntegrationSuite {

static String APP_ID = "app-id";
static String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";
static String HASH_MANAGER = "org.apache.spark.shuffle.hash.HashShuffleManager";
static String SORT_MANAGER = "sort";
static String HASH_MANAGER = "hash";

// Executor 0 is sort-based
static TestShuffleDataContext dataContext0;
Expand Down

0 comments on commit 369127f

Please sign in to comment.