-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-3477] Clean up code in Yarn Client / ClientBase #2350
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3f941dc
fabe4c4
6de9072
ef7069a
6c94d79
8766d37
e4779b6
6573c1d
6d74888
c0587b4
ed0b42d
d8e33b6
45ccdea
1590141
a0ad1e9
547487c
7dd6298
a3b9693
2ca6d64
6619f9b
39e8c7b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,154 +23,127 @@ import org.apache.hadoop.conf.Configuration | |
import org.apache.hadoop.io.DataOutputBuffer | ||
import org.apache.hadoop.security.UserGroupInformation | ||
import org.apache.hadoop.yarn.api._ | ||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment | ||
import org.apache.hadoop.yarn.api.protocolrecords._ | ||
import org.apache.hadoop.yarn.api.records._ | ||
import org.apache.hadoop.yarn.client.YarnClientImpl | ||
import org.apache.hadoop.yarn.conf.YarnConfiguration | ||
import org.apache.hadoop.yarn.ipc.YarnRPC | ||
import org.apache.hadoop.yarn.util.{Apps, Records} | ||
import org.apache.hadoop.yarn.util.Records | ||
|
||
import org.apache.spark.{Logging, SparkConf} | ||
import org.apache.spark.deploy.SparkHadoopUtil | ||
|
||
/** | ||
* Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's alpha API. | ||
*/ | ||
class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf) | ||
private[spark] class Client( | ||
val args: ClientArguments, | ||
val hadoopConf: Configuration, | ||
val sparkConf: SparkConf) | ||
extends YarnClientImpl with ClientBase with Logging { | ||
|
||
def this(clientArgs: ClientArguments, spConf: SparkConf) = | ||
this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf) | ||
|
||
def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf()) | ||
|
||
val args = clientArgs | ||
val conf = hadoopConf | ||
val sparkConf = spConf | ||
var rpc: YarnRPC = YarnRPC.create(conf) | ||
val yarnConf: YarnConfiguration = new YarnConfiguration(conf) | ||
val yarnConf: YarnConfiguration = new YarnConfiguration(hadoopConf) | ||
|
||
/* ------------------------------------------------------------------------------------- * | ||
| The following methods have much in common in the stable and alpha versions of Client, | | ||
| but cannot be implemented in the parent trait due to subtle API differences across | | ||
| hadoop versions. | | ||
* ------------------------------------------------------------------------------------- */ | ||
|
||
// for client user who want to monitor app status by itself. | ||
def runApp() = { | ||
validateArgs() | ||
|
||
/** Submit an application running our ApplicationMaster to the ResourceManager. */ | ||
override def submitApplication(): ApplicationId = { | ||
init(yarnConf) | ||
start() | ||
logClusterResourceDetails() | ||
|
||
val newApp = super.getNewApplication() | ||
val appId = newApp.getApplicationId() | ||
logInfo("Requesting a new application from cluster with %d NodeManagers" | ||
.format(getYarnClusterMetrics.getNumNodeManagers)) | ||
|
||
verifyClusterResources(newApp) | ||
val appContext = createApplicationSubmissionContext(appId) | ||
val appStagingDir = getAppStagingDir(appId) | ||
val localResources = prepareLocalResources(appStagingDir) | ||
val env = setupLaunchEnv(localResources, appStagingDir) | ||
val amContainer = createContainerLaunchContext(newApp, localResources, env) | ||
// Get a new application from our RM | ||
val newAppResponse = getNewApplication() | ||
val appId = newAppResponse.getApplicationId() | ||
|
||
val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource] | ||
// Memory for the ApplicationMaster. | ||
capability.setMemory(args.amMemory + memoryOverhead) | ||
amContainer.setResource(capability) | ||
// Verify whether the cluster has enough resources for our AM | ||
verifyClusterResources(newAppResponse) | ||
|
||
appContext.setQueue(args.amQueue) | ||
appContext.setAMContainerSpec(amContainer) | ||
appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName()) | ||
// Set up the appropriate contexts to launch our AM | ||
val containerContext = createContainerLaunchContext(newAppResponse) | ||
val appContext = createApplicationSubmissionContext(appId, containerContext) | ||
|
||
submitApp(appContext) | ||
// Finally, submit and monitor the application | ||
logInfo(s"Submitting application ${appId.getId} to ResourceManager") | ||
submitApplication(appContext) | ||
appId | ||
} | ||
|
||
def run() { | ||
val appId = runApp() | ||
monitorApplication(appId) | ||
} | ||
|
||
def logClusterResourceDetails() { | ||
val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics | ||
logInfo("Got cluster metric info from ASM, numNodeManagers = " + | ||
clusterMetrics.getNumNodeManagers) | ||
/** | ||
* Set up a context for launching our ApplicationMaster container. | ||
* In the Yarn alpha API, the memory requirements of this container must be set in | ||
* the ContainerLaunchContext instead of the ApplicationSubmissionContext. | ||
*/ | ||
override def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse) | ||
: ContainerLaunchContext = { | ||
val containerContext = super.createContainerLaunchContext(newAppResponse) | ||
val capability = Records.newRecord(classOf[Resource]) | ||
capability.setMemory(args.amMemory + amMemoryOverhead) | ||
containerContext.setResource(capability) | ||
containerContext | ||
} | ||
|
||
|
||
def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = { | ||
logInfo("Setting up application submission context for ASM") | ||
/** Set up the context for submitting our ApplicationMaster. */ | ||
def createApplicationSubmissionContext( | ||
appId: ApplicationId, | ||
containerContext: ContainerLaunchContext): ApplicationSubmissionContext = { | ||
val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) | ||
appContext.setApplicationId(appId) | ||
appContext.setApplicationName(args.appName) | ||
appContext.setQueue(args.amQueue) | ||
appContext.setAMContainerSpec(containerContext) | ||
appContext.setUser(UserGroupInformation.getCurrentUser.getShortUserName) | ||
appContext | ||
} | ||
|
||
def setupSecurityToken(amContainer: ContainerLaunchContext) = { | ||
// Setup security tokens. | ||
/** | ||
* Set up security tokens for launching our ApplicationMaster container. | ||
* ContainerLaunchContext#setContainerTokens is renamed `setTokens` in the stable API. | ||
*/ | ||
override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = { | ||
val dob = new DataOutputBuffer() | ||
credentials.writeTokenStorageToStream(dob) | ||
amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData())) | ||
} | ||
|
||
def submitApp(appContext: ApplicationSubmissionContext) = { | ||
// Submit the application to the applications manager. | ||
logInfo("Submitting application to ASM") | ||
super.submitApplication(appContext) | ||
} | ||
|
||
def monitorApplication(appId: ApplicationId): Boolean = { | ||
val interval = sparkConf.getLong("spark.yarn.report.interval", 1000) | ||
|
||
while (true) { | ||
Thread.sleep(interval) | ||
val report = super.getApplicationReport(appId) | ||
|
||
logInfo("Application report from ASM: \n" + | ||
"\t application identifier: " + appId.toString() + "\n" + | ||
"\t appId: " + appId.getId() + "\n" + | ||
"\t clientToken: " + report.getClientToken() + "\n" + | ||
"\t appDiagnostics: " + report.getDiagnostics() + "\n" + | ||
"\t appMasterHost: " + report.getHost() + "\n" + | ||
"\t appQueue: " + report.getQueue() + "\n" + | ||
"\t appMasterRpcPort: " + report.getRpcPort() + "\n" + | ||
"\t appStartTime: " + report.getStartTime() + "\n" + | ||
"\t yarnAppState: " + report.getYarnApplicationState() + "\n" + | ||
"\t distributedFinalState: " + report.getFinalApplicationStatus() + "\n" + | ||
"\t appTrackingUrl: " + report.getTrackingUrl() + "\n" + | ||
"\t appUser: " + report.getUser() | ||
) | ||
|
||
val state = report.getYarnApplicationState() | ||
if (state == YarnApplicationState.FINISHED || | ||
state == YarnApplicationState.FAILED || | ||
state == YarnApplicationState.KILLED) { | ||
return true | ||
} | ||
} | ||
true | ||
} | ||
/** | ||
* Return the security token used by this client to communicate with the ApplicationMaster. | ||
* If no security is enabled, the token returned by the report is null. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I verified that this statement is true for the stable API. @tgravescs do you know if this is true for alpha? (Do we need a special null check here) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes getClientToken can return null and does if security is off. |
||
* ApplicationReport#getClientToken is renamed `getClientToAMToken` in the stable API. | ||
*/ | ||
override def getClientToken(report: ApplicationReport): String = | ||
Option(report.getClientToken).getOrElse("") | ||
} | ||
|
||
object Client { | ||
|
||
def main(argStrings: Array[String]) { | ||
if (!sys.props.contains("SPARK_SUBMIT")) { | ||
println("WARNING: This client is deprecated and will be removed in a " + | ||
"future version of Spark. Use ./bin/spark-submit with \"--master yarn\"") | ||
} | ||
|
||
// Set an env variable indicating we are running in YARN mode. | ||
// Note that anything with SPARK prefix gets propagated to all (remote) processes | ||
// Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes | ||
System.setProperty("SPARK_YARN_MODE", "true") | ||
|
||
val sparkConf = new SparkConf | ||
|
||
try { | ||
val args = new ClientArguments(argStrings, sparkConf) | ||
new Client(args, sparkConf).run() | ||
} catch { | ||
case e: Exception => { | ||
case e: Exception => | ||
Console.err.println(e.getMessage) | ||
System.exit(1) | ||
} | ||
} | ||
|
||
System.exit(0) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,15 +17,14 @@ | |
|
||
package org.apache.spark.deploy.yarn | ||
|
||
import scala.collection.mutable.{ArrayBuffer, HashMap} | ||
import scala.collection.mutable.ArrayBuffer | ||
|
||
import org.apache.spark.SparkConf | ||
import org.apache.spark.scheduler.InputFormatInfo | ||
import org.apache.spark.util.{Utils, IntParam, MemoryParam} | ||
|
||
|
||
// TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware ! | ||
class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { | ||
private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) { | ||
var addJars: String = null | ||
var files: String = null | ||
var archives: String = null | ||
|
@@ -35,28 +34,56 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { | |
var executorMemory = 1024 // MB | ||
var executorCores = 1 | ||
var numExecutors = 2 | ||
var amQueue = sparkConf.get("QUEUE", "default") | ||
var amQueue = sparkConf.get("spark.yarn.queue", "default") | ||
var amMemory: Int = 512 // MB | ||
var appName: String = "Spark" | ||
var priority = 0 | ||
|
||
parseArgs(args.toList) | ||
// Additional memory to allocate to containers | ||
// For now, use driver's memory overhead as our AM container's memory overhead | ||
val amMemoryOverhead = sparkConf.getInt( | ||
"spark.yarn.driver.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD) | ||
val executorMemoryOverhead = sparkConf.getInt( | ||
"spark.yarn.executor.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD) | ||
|
||
// env variable SPARK_YARN_DIST_ARCHIVES/SPARK_YARN_DIST_FILES set in yarn-client then | ||
// it should default to hdfs:// | ||
files = Option(files).getOrElse(sys.env.get("SPARK_YARN_DIST_FILES").orNull) | ||
archives = Option(archives).getOrElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES").orNull) | ||
parseArgs(args.toList) | ||
loadEnvironmentArgs() | ||
validateArgs() | ||
|
||
/** Load any default arguments provided through environment variables and Spark properties. */ | ||
private def loadEnvironmentArgs(): Unit = { | ||
// For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://, | ||
// while spark.yarn.dist.{archives/files} should be resolved to file:// (SPARK-2051). | ||
files = Option(files) | ||
.orElse(sys.env.get("SPARK_YARN_DIST_FILES")) | ||
.orElse(sparkConf.getOption("spark.yarn.dist.files").map(p => Utils.resolveURIs(p))) | ||
.orNull | ||
archives = Option(archives) | ||
.orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES")) | ||
.orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p))) | ||
.orNull | ||
} | ||
|
||
// spark.yarn.dist.archives/spark.yarn.dist.files defaults to use file:// if not specified, | ||
// for both yarn-client and yarn-cluster | ||
files = Option(files).getOrElse(sparkConf.getOption("spark.yarn.dist.files"). | ||
map(p => Utils.resolveURIs(p)).orNull) | ||
archives = Option(archives).getOrElse(sparkConf.getOption("spark.yarn.dist.archives"). | ||
map(p => Utils.resolveURIs(p)).orNull) | ||
/** | ||
* Fail fast if any arguments provided are invalid. | ||
* This is intended to be called only after the provided arguments have been parsed. | ||
*/ | ||
private def validateArgs(): Unit = { | ||
// TODO: memory checks are outdated (SPARK-3476) | ||
Map[Boolean, String]( | ||
(numExecutors <= 0) -> "You must specify at least 1 executor!", | ||
(amMemory <= amMemoryOverhead) -> s"AM memory must be > $amMemoryOverhead MB", | ||
(executorMemory <= executorMemoryOverhead) -> | ||
s"Executor memory must be > $executorMemoryOverhead MB" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @tgravescs You had a few comments about a few of these checks being outdated in a separate PR. Which ones are no longer relevant? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the memory ones are. see https://issues.apache.org/jira/browse/SPARK-3476 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would say just leave it as in for this pr and we can handle under the jira I filed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alright, I'll leave these as is for now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I removed them here: #2528 |
||
).foreach { case (errorCondition, errorMessage) => | ||
if (errorCondition) { | ||
throw new IllegalArgumentException(errorMessage + "\n" + getUsageMessage()) | ||
} | ||
} | ||
} | ||
|
||
private def parseArgs(inputArgs: List[String]): Unit = { | ||
val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]() | ||
|
||
val userArgsBuffer = new ArrayBuffer[String]() | ||
var args = inputArgs | ||
|
||
while (!args.isEmpty) { | ||
|
@@ -138,16 +165,14 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { | |
userArgs = userArgsBuffer.readOnly | ||
} | ||
|
||
|
||
def getUsageMessage(unknownParam: Any = null): String = { | ||
private def getUsageMessage(unknownParam: List[String] = null): String = { | ||
val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else "" | ||
|
||
message + | ||
"Usage: org.apache.spark.deploy.yarn.Client [options] \n" + | ||
"Options:\n" + | ||
" --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster mode)\n" + | ||
" --class CLASS_NAME Name of your application's main class (required)\n" + | ||
" --arg ARGS Argument to be passed to your application's main class.\n" + | ||
" --arg ARG Argument to be passed to your application's main class.\n" + | ||
" Multiple invocations are possible, each will be passed in order.\n" + | ||
" --num-executors NUM Number of executors to start (Default: 2)\n" + | ||
" --executor-cores NUM Number of cores for the executors (Default: 1).\n" + | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the past people might have actually used this directly to. I've seen people request in the past how to run the client from another program without going out to shell so this might break backwards compatibility also. We should probably file a jira to look at supporting that officially.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is addressed in the main discussion thread.