Skip to content

Commit

Permalink
[SPARK-5653][YARN] In ApplicationMaster rename isDriver to isClusterMode
Browse files Browse the repository at this point in the history
in ApplicationMaster rename isDriver to isClusterMode,because in Client it uses isClusterMode,ApplicationMaster should keep consistent with it and uses isClusterMode.Also isClusterMode is easier to understand.
andrewor14 sryza

Author: lianhuiwang <lianhuiwang09@gmail.com>

Closes #4430 from lianhuiwang/am-isDriver-rename and squashes the following commits:

f9f3ed0 [lianhuiwang] rename isDriver to isClusterMode
  • Loading branch information
lianhuiwang authored and Andrew Or committed Feb 6, 2015
1 parent 9ad56ad commit cc6e531
Showing 1 changed file with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[spark] class ApplicationMaster(
private val sparkConf = new SparkConf()
private val yarnConf: YarnConfiguration = SparkHadoopUtil.get.newConfiguration(sparkConf)
.asInstanceOf[YarnConfiguration]
private val isDriver = args.userClass != null
private val isClusterMode = args.userClass != null

// Default to numExecutors * 2, with minimum of 3
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
Expand All @@ -81,7 +81,7 @@ private[spark] class ApplicationMaster(
try {
val appAttemptId = client.getAttemptId()

if (isDriver) {
if (isClusterMode) {
// Set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
System.setProperty("spark.ui.port", "0")
Expand Down Expand Up @@ -139,7 +139,7 @@ private[spark] class ApplicationMaster(
// doAs in order for the credentials to be passed on to the executor containers.
val securityMgr = new SecurityManager(sparkConf)

if (isDriver) {
if (isClusterMode) {
runDriver(securityMgr)
} else {
runExecutorLauncher(securityMgr)
Expand All @@ -162,7 +162,7 @@ private[spark] class ApplicationMaster(
* from the application code.
*/
final def getDefaultFinalStatus() = {
if (isDriver) {
if (isClusterMode) {
FinalApplicationStatus.SUCCEEDED
} else {
FinalApplicationStatus.UNDEFINED
Expand Down Expand Up @@ -243,15 +243,15 @@ private[spark] class ApplicationMaster(
private def runAMActor(
host: String,
port: String,
isDriver: Boolean): Unit = {
isClusterMode: Boolean): Unit = {

val driverUrl = AkkaUtils.address(
AkkaUtils.protocol(actorSystem),
SparkEnv.driverActorSystemName,
host,
port,
YarnSchedulerBackend.ACTOR_NAME)
actor = actorSystem.actorOf(Props(new AMActor(driverUrl, isDriver)), name = "YarnAM")
actor = actorSystem.actorOf(Props(new AMActor(driverUrl, isClusterMode)), name = "YarnAM")
}

private def runDriver(securityMgr: SecurityManager): Unit = {
Expand All @@ -272,7 +272,7 @@ private[spark] class ApplicationMaster(
runAMActor(
sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"),
isDriver = true)
isClusterMode = true)
registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
userClassThread.join()
}
Expand Down Expand Up @@ -427,15 +427,15 @@ private[spark] class ApplicationMaster(
sparkConf.set("spark.driver.host", driverHost)
sparkConf.set("spark.driver.port", driverPort.toString)

runAMActor(driverHost, driverPort.toString, isDriver = false)
runAMActor(driverHost, driverPort.toString, isClusterMode = false)
}

/** Add the Yarn IP filter that is required for properly securing the UI. */
private def addAmIpFilter() = {
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
val params = client.getAmIpFilterParams(yarnConf, proxyBase)
if (isDriver) {
if (isClusterMode) {
System.setProperty("spark.ui.filters", amFilter)
params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) }
} else {
Expand Down Expand Up @@ -491,7 +491,7 @@ private[spark] class ApplicationMaster(
/**
* An actor that communicates with the driver's scheduler backend.
*/
private class AMActor(driverUrl: String, isDriver: Boolean) extends Actor {
private class AMActor(driverUrl: String, isClusterMode: Boolean) extends Actor {
var driver: ActorSelection = _

override def preStart() = {
Expand All @@ -503,7 +503,7 @@ private[spark] class ApplicationMaster(
driver ! RegisterClusterManager
// In cluster mode, the AM can directly monitor the driver status instead
// of trying to deduce it from the lifecycle of the driver's actor
if (!isDriver) {
if (!isClusterMode) {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
}
Expand All @@ -513,7 +513,7 @@ private[spark] class ApplicationMaster(
logInfo(s"Driver terminated or disconnected! Shutting down. $x")
// In cluster mode, do not rely on the disassociated event to exit
// This avoids potentially reporting incorrect exit codes if the driver fails
if (!isDriver) {
if (!isClusterMode) {
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
}

Expand Down

0 comments on commit cc6e531

Please sign in to comment.