18
18
package org .apache .spark .deploy .yarn
19
19
20
20
import java .util .{List => JList }
21
- import java .util .concurrent .ConcurrentHashMap
21
+ import java .util .concurrent ._
22
22
import java .util .concurrent .atomic .AtomicInteger
23
23
24
24
import scala .collection .JavaConversions ._
@@ -32,6 +32,8 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
32
32
import org .apache .spark .scheduler .{SplitInfo , TaskSchedulerImpl }
33
33
import org .apache .spark .scheduler .cluster .CoarseGrainedSchedulerBackend
34
34
35
+ import com .google .common .util .concurrent .ThreadFactoryBuilder
36
+
35
37
object AllocationType extends Enumeration {
36
38
type AllocationType = Value
37
39
val HOST, RACK, ANY = Value
@@ -95,6 +97,14 @@ private[yarn] abstract class YarnAllocator(
95
97
protected val (preferredHostToCount, preferredRackToCount) =
96
98
generateNodeToWeight(conf, preferredNodes)
97
99
100
+ private val launcherPool = new ThreadPoolExecutor (
101
+ // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
102
+ sparkConf.getInt(" spark.yarn.containerLauncherMaxThreads" , 25 ), Integer .MAX_VALUE ,
103
+ 1 , TimeUnit .MINUTES ,
104
+ new LinkedBlockingQueue [Runnable ](),
105
+ new ThreadFactoryBuilder ().setNameFormat(" ContainerLauncher #%d" ).setDaemon(true ).build())
106
+ launcherPool.allowCoreThreadTimeOut(true )
107
+
98
108
def getNumExecutorsRunning : Int = numExecutorsRunning.intValue
99
109
100
110
def getNumExecutorsFailed : Int = numExecutorsFailed.intValue
@@ -283,7 +293,7 @@ private[yarn] abstract class YarnAllocator(
283
293
executorMemory,
284
294
executorCores,
285
295
securityMgr)
286
- new Thread (executorRunnable).start( )
296
+ launcherPool.execute(executorRunnable )
287
297
}
288
298
}
289
299
logDebug("""
0 commit comments