Skip to content

[SPARK-3477] Clean up code in Yarn Client / ClientBase #2350

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
3f941dc
First cut at simplifying the Client (stable and alpha)
andrewor14 Sep 9, 2014
fabe4c4
Reuse more code in YarnClientSchedulerBackend
andrewor14 Sep 9, 2014
6de9072
Guard against potential NPE in debug logging mode
andrewor14 Sep 9, 2014
ef7069a
Clean up YarnClientSchedulerBackend more
andrewor14 Sep 9, 2014
6c94d79
Various cleanups in ClientBase and ClientArguments
andrewor14 Sep 9, 2014
8766d37
Heavily add documentation to Client* classes + various clean-ups
andrewor14 Sep 10, 2014
e4779b6
Clean up log messages + variable naming in ClientBase
andrewor14 Sep 10, 2014
6573c1d
Clean up, simplify and document code for setting classpaths
andrewor14 Sep 10, 2014
6d74888
Minor comment changes
andrewor14 Sep 10, 2014
c0587b4
Merge branch 'master' of github.com:apache/spark into yarn-cleanup
andrewor14 Sep 10, 2014
ed0b42d
Fix alpha compilation error
andrewor14 Sep 10, 2014
d8e33b6
Merge branch 'master' of github.com:apache/spark into yarn-cleanup
andrewor14 Sep 16, 2014
45ccdea
Remove usages of getAMMemory
andrewor14 Sep 16, 2014
1590141
Address review comments
andrewor14 Sep 17, 2014
a0ad1e9
Fix class not found error
andrewor14 Sep 17, 2014
547487c
Provide default values for null application report entries
andrewor14 Sep 17, 2014
7dd6298
Simplify ClientBase#monitorApplication
andrewor14 Sep 17, 2014
a3b9693
Minor changes
andrewor14 Sep 17, 2014
2ca6d64
Improve logging in application monitor
andrewor14 Sep 17, 2014
6619f9b
Merge branch 'master' of github.com:apache/spark into yarn-cleanup
andrewor14 Sep 22, 2014
39e8c7b
Address review comments
andrewor14 Sep 22, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 59 additions & 86 deletions yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,154 +23,127 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.YarnClientImpl
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, Records}
import org.apache.hadoop.yarn.util.Records

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil

/**
* Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's alpha API.
*/
class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf)
private[spark] class Client(
val args: ClientArguments,
val hadoopConf: Configuration,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the past people might have actually used this directly to. I've seen people request in the past how to run the client from another program without going out to shell so this might break backwards compatibility also. We should probably file a jira to look at supporting that officially.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is addressed in the main discussion thread.

val sparkConf: SparkConf)
extends YarnClientImpl with ClientBase with Logging {

def this(clientArgs: ClientArguments, spConf: SparkConf) =
this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)

def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())

val args = clientArgs
val conf = hadoopConf
val sparkConf = spConf
var rpc: YarnRPC = YarnRPC.create(conf)
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
val yarnConf: YarnConfiguration = new YarnConfiguration(hadoopConf)

/* ------------------------------------------------------------------------------------- *
| The following methods have much in common in the stable and alpha versions of Client, |
| but cannot be implemented in the parent trait due to subtle API differences across |
| hadoop versions. |
* ------------------------------------------------------------------------------------- */

// for client user who want to monitor app status by itself.
def runApp() = {
validateArgs()

/** Submit an application running our ApplicationMaster to the ResourceManager. */
override def submitApplication(): ApplicationId = {
init(yarnConf)
start()
logClusterResourceDetails()

val newApp = super.getNewApplication()
val appId = newApp.getApplicationId()
logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(getYarnClusterMetrics.getNumNodeManagers))

verifyClusterResources(newApp)
val appContext = createApplicationSubmissionContext(appId)
val appStagingDir = getAppStagingDir(appId)
val localResources = prepareLocalResources(appStagingDir)
val env = setupLaunchEnv(localResources, appStagingDir)
val amContainer = createContainerLaunchContext(newApp, localResources, env)
// Get a new application from our RM
val newAppResponse = getNewApplication()
val appId = newAppResponse.getApplicationId()

val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
// Memory for the ApplicationMaster.
capability.setMemory(args.amMemory + memoryOverhead)
amContainer.setResource(capability)
// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)

appContext.setQueue(args.amQueue)
appContext.setAMContainerSpec(amContainer)
appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(appId, containerContext)

submitApp(appContext)
// Finally, submit and monitor the application
logInfo(s"Submitting application ${appId.getId} to ResourceManager")
submitApplication(appContext)
appId
}

def run() {
val appId = runApp()
monitorApplication(appId)
}

def logClusterResourceDetails() {
val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
logInfo("Got cluster metric info from ASM, numNodeManagers = " +
clusterMetrics.getNumNodeManagers)
/**
* Set up a context for launching our ApplicationMaster container.
* In the Yarn alpha API, the memory requirements of this container must be set in
* the ContainerLaunchContext instead of the ApplicationSubmissionContext.
*/
override def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
: ContainerLaunchContext = {
val containerContext = super.createContainerLaunchContext(newAppResponse)
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(args.amMemory + amMemoryOverhead)
containerContext.setResource(capability)
containerContext
}


def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = {
logInfo("Setting up application submission context for ASM")
/** Set up the context for submitting our ApplicationMaster. */
def createApplicationSubmissionContext(
appId: ApplicationId,
containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
appContext.setApplicationId(appId)
appContext.setApplicationName(args.appName)
appContext.setQueue(args.amQueue)
appContext.setAMContainerSpec(containerContext)
appContext.setUser(UserGroupInformation.getCurrentUser.getShortUserName)
appContext
}

def setupSecurityToken(amContainer: ContainerLaunchContext) = {
// Setup security tokens.
/**
* Set up security tokens for launching our ApplicationMaster container.
* ContainerLaunchContext#setContainerTokens is renamed `setTokens` in the stable API.
*/
override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
val dob = new DataOutputBuffer()
credentials.writeTokenStorageToStream(dob)
amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData()))
}

def submitApp(appContext: ApplicationSubmissionContext) = {
// Submit the application to the applications manager.
logInfo("Submitting application to ASM")
super.submitApplication(appContext)
}

def monitorApplication(appId: ApplicationId): Boolean = {
val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)

while (true) {
Thread.sleep(interval)
val report = super.getApplicationReport(appId)

logInfo("Application report from ASM: \n" +
"\t application identifier: " + appId.toString() + "\n" +
"\t appId: " + appId.getId() + "\n" +
"\t clientToken: " + report.getClientToken() + "\n" +
"\t appDiagnostics: " + report.getDiagnostics() + "\n" +
"\t appMasterHost: " + report.getHost() + "\n" +
"\t appQueue: " + report.getQueue() + "\n" +
"\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
"\t appStartTime: " + report.getStartTime() + "\n" +
"\t yarnAppState: " + report.getYarnApplicationState() + "\n" +
"\t distributedFinalState: " + report.getFinalApplicationStatus() + "\n" +
"\t appTrackingUrl: " + report.getTrackingUrl() + "\n" +
"\t appUser: " + report.getUser()
)

val state = report.getYarnApplicationState()
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
return true
}
}
true
}
/**
* Return the security token used by this client to communicate with the ApplicationMaster.
* If no security is enabled, the token returned by the report is null.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified that this statement is true for the stable API. @tgravescs do you know if this is true for alpha? (Do we need a special null check here)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes getClientToken can return null and does if security is off.

* ApplicationReport#getClientToken is renamed `getClientToAMToken` in the stable API.
*/
override def getClientToken(report: ApplicationReport): String =
Option(report.getClientToken).getOrElse("")
}

object Client {

def main(argStrings: Array[String]) {
if (!sys.props.contains("SPARK_SUBMIT")) {
println("WARNING: This client is deprecated and will be removed in a " +
"future version of Spark. Use ./bin/spark-submit with \"--master yarn\"")
}

// Set an env variable indicating we are running in YARN mode.
// Note that anything with SPARK prefix gets propagated to all (remote) processes
// Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes
System.setProperty("SPARK_YARN_MODE", "true")

val sparkConf = new SparkConf

try {
val args = new ClientArguments(argStrings, sparkConf)
new Client(args, sparkConf).run()
} catch {
case e: Exception => {
case e: Exception =>
Console.err.println(e.getMessage)
System.exit(1)
}
}

System.exit(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

package org.apache.spark.deploy.yarn

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.SparkConf
import org.apache.spark.scheduler.InputFormatInfo
import org.apache.spark.util.{Utils, IntParam, MemoryParam}


// TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) {
var addJars: String = null
var files: String = null
var archives: String = null
Expand All @@ -35,28 +34,56 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
var executorMemory = 1024 // MB
var executorCores = 1
var numExecutors = 2
var amQueue = sparkConf.get("QUEUE", "default")
var amQueue = sparkConf.get("spark.yarn.queue", "default")
var amMemory: Int = 512 // MB
var appName: String = "Spark"
var priority = 0

parseArgs(args.toList)
// Additional memory to allocate to containers
// For now, use driver's memory overhead as our AM container's memory overhead
val amMemoryOverhead = sparkConf.getInt(
"spark.yarn.driver.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
val executorMemoryOverhead = sparkConf.getInt(
"spark.yarn.executor.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)

// env variable SPARK_YARN_DIST_ARCHIVES/SPARK_YARN_DIST_FILES set in yarn-client then
// it should default to hdfs://
files = Option(files).getOrElse(sys.env.get("SPARK_YARN_DIST_FILES").orNull)
archives = Option(archives).getOrElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES").orNull)
parseArgs(args.toList)
loadEnvironmentArgs()
validateArgs()

/** Load any default arguments provided through environment variables and Spark properties. */
private def loadEnvironmentArgs(): Unit = {
// For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://,
// while spark.yarn.dist.{archives/files} should be resolved to file:// (SPARK-2051).
files = Option(files)
.orElse(sys.env.get("SPARK_YARN_DIST_FILES"))
.orElse(sparkConf.getOption("spark.yarn.dist.files").map(p => Utils.resolveURIs(p)))
.orNull
archives = Option(archives)
.orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
.orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p)))
.orNull
}

// spark.yarn.dist.archives/spark.yarn.dist.files defaults to use file:// if not specified,
// for both yarn-client and yarn-cluster
files = Option(files).getOrElse(sparkConf.getOption("spark.yarn.dist.files").
map(p => Utils.resolveURIs(p)).orNull)
archives = Option(archives).getOrElse(sparkConf.getOption("spark.yarn.dist.archives").
map(p => Utils.resolveURIs(p)).orNull)
/**
* Fail fast if any arguments provided are invalid.
* This is intended to be called only after the provided arguments have been parsed.
*/
private def validateArgs(): Unit = {
// TODO: memory checks are outdated (SPARK-3476)
Map[Boolean, String](
(numExecutors <= 0) -> "You must specify at least 1 executor!",
(amMemory <= amMemoryOverhead) -> s"AM memory must be > $amMemoryOverhead MB",
(executorMemory <= executorMemoryOverhead) ->
s"Executor memory must be > $executorMemoryOverhead MB"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgravescs You had a few comments about a few of these checks being outdated in a separate PR. Which ones are no longer relevant?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say just leave it as in for this pr and we can handle under the jira I filed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I'll leave these as is for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed them here: #2528

).foreach { case (errorCondition, errorMessage) =>
if (errorCondition) {
throw new IllegalArgumentException(errorMessage + "\n" + getUsageMessage())
}
}
}

private def parseArgs(inputArgs: List[String]): Unit = {
val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]()

val userArgsBuffer = new ArrayBuffer[String]()
var args = inputArgs

while (!args.isEmpty) {
Expand Down Expand Up @@ -138,16 +165,14 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
userArgs = userArgsBuffer.readOnly
}


def getUsageMessage(unknownParam: Any = null): String = {
private def getUsageMessage(unknownParam: List[String] = null): String = {
val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else ""

message +
"Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
"Options:\n" +
" --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster mode)\n" +
" --class CLASS_NAME Name of your application's main class (required)\n" +
" --arg ARGS Argument to be passed to your application's main class.\n" +
" --arg ARG Argument to be passed to your application's main class.\n" +
" Multiple invocations are possible, each will be passed in order.\n" +
" --num-executors NUM Number of executors to start (Default: 2)\n" +
" --executor-cores NUM Number of cores for the executors (Default: 1).\n" +
Expand Down
Loading