Skip to content

Commit f581122

Browse files
author
Marcelo Vanzin
committed
Review feedback.
1 parent 557fdeb commit f581122

File tree

6 files changed

+14
-22
lines changed

6 files changed

+14
-22
lines changed

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ private[yarn] class YarnAllocationHandler(
5555
resourceManager: AMRMProtocol,
5656
appAttemptId: ApplicationAttemptId,
5757
args: ApplicationMasterArguments,
58-
map: collection.Map[String, collection.Set[SplitInfo]])
58+
preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
5959
extends YarnAllocator with Logging {
6060

6161
// These three are locked on allocatedHostToContainersMap. Complementary data structures
@@ -90,7 +90,8 @@ private[yarn] class YarnAllocationHandler(
9090
private val maxExecutors = args.numExecutors
9191
private val executorMemory = args.executorMemory
9292
private val executorCores = args.executorCores
93-
private val (preferredHostToCount, preferredRackToCount) = generateNodeToWeight(conf, map)
93+
private val (preferredHostToCount, preferredRackToCount) =
94+
generateNodeToWeight(conf, preferredNodes)
9495

9596
def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
9697

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
5656
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
5757
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
5858

59-
private var finished = false
60-
private var registered = false
59+
@volatile private var finished = false
6160
private var reporterThread: Thread = _
6261
private var allocator: YarnAllocator = _
6362

@@ -123,7 +122,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
123122

124123
final def finish(status: FinalApplicationStatus, diagnostics: String = "") = synchronized {
125124
if (!finished) {
126-
logInfo(s"Finishing ApplicationMaster with $status")
125+
logInfo(s"Finishing ApplicationMaster with $status" +
126+
Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
127127
finished = true
128128
reporterThread.interrupt()
129129
reporterThread.join()
@@ -163,6 +163,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
163163
uiAddress,
164164
uiHistoryAddress)
165165

166+
allocator.allocateResources()
166167
reporterThread = launchReporterThread()
167168
}
168169

@@ -329,18 +330,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
329330
actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
330331
}
331332

332-
private def allocateExecutors() = {
333-
logInfo("Requesting" + args.numExecutors + " executors.")
334-
try {
335-
while (allocator.getNumExecutorsRunning < args.numExecutors && !finished) {
336-
checkNumExecutorsFailed()
337-
allocator.allocateResources()
338-
Thread.sleep(ALLOCATE_HEARTBEAT_INTERVAL)
339-
}
340-
}
341-
logInfo("All executors have launched.")
342-
}
343-
344333
private def checkNumExecutorsFailed() = {
345334
if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
346335
finish(FinalApplicationStatus.FAILED, "Max number of executor failures reached.")

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ trait ClientBase extends Logging {
403403
classOf[ApplicationMaster].getName().replace("ApplicationMaster", "ExecutorLauncher")
404404
}
405405
val amArgs =
406-
Seq(classOf[ApplicationMaster].getName()) ++ userClass ++
406+
Seq(amClass) ++ userClass ++
407407
(if (args.userJar != null) Seq("--jar", args.userJar) else Nil) ++
408408
Seq("--executor-memory", args.executorMemory.toString,
409409
"--executor-cores", args.executorCores.toString,

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ import org.apache.spark.{SparkConf, SparkContext}
2626
import org.apache.spark.scheduler.SplitInfo
2727

2828
/**
29-
* Interface that defines a Yarn RM client.
29+
* Interface that defines a Yarn RM client. Abstracts away Yarn version-specific functionality that
30+
* is used by Spark's AM.
3031
*/
3132
trait YarnRMClient {
3233

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ object YarnSparkHadoopUtil {
9494
val RM_REQUEST_PRIORITY = 1
9595

9696
// Host to rack map - saved from allocation requests. We are expecting this not to change.
97-
// Note that it is possible for this to change : and ResurceManager will indicate that to us via
97+
// Note that it is possible for this to change : and ResourceManager will indicate that to us via
9898
// update response to allocate. But we are punting on handling that for now.
9999
private val hostToRack = new ConcurrentHashMap[String, String]()
100100
private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ private[yarn] class YarnAllocationHandler(
5656
amClient: AMRMClient[ContainerRequest],
5757
appAttemptId: ApplicationAttemptId,
5858
args: ApplicationMasterArguments,
59-
map: collection.Map[String, collection.Set[SplitInfo]])
59+
preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
6060
extends YarnAllocator with Logging {
6161

6262
// These three are locked on allocatedHostToContainersMap. Complementary data structures
@@ -94,7 +94,8 @@ private[yarn] class YarnAllocationHandler(
9494
private val maxExecutors = args.numExecutors
9595
private val executorMemory = args.executorMemory
9696
private val executorCores = args.executorCores
97-
private val (preferredHostToCount, preferredRackToCount) = generateNodeToWeight(conf, map)
97+
private val (preferredHostToCount, preferredRackToCount) =
98+
generateNodeToWeight(conf, preferredNodes)
9899

99100
override def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
100101

0 commit comments

Comments
 (0)