Skip to content

Commit

Permalink
SPARK-1713. Use a thread pool for launching executors.
Browse files Browse the repository at this point in the history
This patch copies the approach used in the MapReduce application master for launching containers.

Author: Sandy Ryza <sandy@cloudera.com>

Closes apache#663 from sryza/sandy-spark-1713 and squashes the following commits:

036550d [Sandy Ryza] SPARK-1713. [YARN] Use a threadpool for launching executor containers
  • Loading branch information
sryza authored and tgravescs committed Sep 10, 2014
1 parent 26503fd commit 1f4a648
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
7 changes: 7 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
the environment of the executor launcher.
</td>
</tr>
<tr>
<td><code>spark.yarn.containerLauncherMaxThreads</code></td>
<td>25</td>
<td>
The maximum number of threads to use in the application master for launching executor containers.
</td>
</tr>
</table>

# Launching Spark on YARN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.deploy.yarn

import java.util.{List => JList}
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConversions._
Expand All @@ -32,6 +32,8 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend

import com.google.common.util.concurrent.ThreadFactoryBuilder

object AllocationType extends Enumeration {
type AllocationType = Value
val HOST, RACK, ANY = Value
Expand Down Expand Up @@ -95,6 +97,14 @@ private[yarn] abstract class YarnAllocator(
protected val (preferredHostToCount, preferredRackToCount) =
generateNodeToWeight(conf, preferredNodes)

private val launcherPool = new ThreadPoolExecutor(
// max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE,
1, TimeUnit.MINUTES,
new LinkedBlockingQueue[Runnable](),
new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build())
launcherPool.allowCoreThreadTimeOut(true)

def getNumExecutorsRunning: Int = numExecutorsRunning.intValue

def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
Expand Down Expand Up @@ -283,7 +293,7 @@ private[yarn] abstract class YarnAllocator(
executorMemory,
executorCores,
securityMgr)
new Thread(executorRunnable).start()
launcherPool.execute(executorRunnable)
}
}
logDebug("""
Expand Down

0 comments on commit 1f4a648

Please sign in to comment.