Skip to content

Commit

Permalink
[SPARK-8302] Support heterogeneous cluster install paths on YARN.
Browse files Browse the repository at this point in the history
Some users have Hadoop installations on different paths across
their cluster. Currently, that makes it hard to set up some
configuration in Spark since that requires hardcoding paths to
jar files or native libraries, which wouldn't work on such a cluster.

This change introduces a couple of YARN-specific configurations
that instruct the backend to replace certain paths when launching
remote processes. That way, if the configuration says the Spark
jar is in "/spark/spark.jar", and also says that "/spark" should be
replaced with "{{SPARK_INSTALL_DIR}}", YARN will start containers
in the NMs with "{{SPARK_INSTALL_DIR}}/spark.jar" as the location
of the jar.

Coupled with YARN's environment whitelist (which allows certain
env variables to be exposed to containers), this allows users to
support such heterogeneous environments, as long as a single
replacement is enough. (Otherwise, this feature would need to be
extended to support multiple path replacements.)

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes apache#6752 from vanzin/SPARK-8302 and squashes the following commits:

4bff8d4 [Marcelo Vanzin] Add docs, rename configs.
0aa2a02 [Marcelo Vanzin] Only do replacement for paths that need it.
2e9cc9d [Marcelo Vanzin] Style.
a5e1f68 [Marcelo Vanzin] [SPARK-8302] Support heterogeneous cluster install paths on YARN.
  • Loading branch information
Marcelo Vanzin authored and squito committed Jun 26, 2015
1 parent c9e05a3 commit 37bf76a
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 12 deletions.
26 changes: 26 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,32 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
Principal to be used to login to KDC, while running on secure HDFS.
</td>
</tr>
<tr>
<td><code>spark.yarn.config.gatewayPath</code></td>
<td>(none)</td>
<td>
A path that is valid on the gateway host (the host where a Spark application is started) but may
differ for paths for the same resource in other nodes in the cluster. Coupled with
<code>spark.yarn.config.replacementPath</code>, this is used to support clusters with
heterogeneous configurations, so that Spark can correctly launch remote processes.
<p/>
The replacement path normally will contain a reference to some environment variable exported by
YARN (and, thus, visible to Spark containers).
<p/>
For example, if the gateway node has Hadoop libraries installed on <code>/disk1/hadoop</code>, and
the location of the Hadoop install is exported by YARN as the <code>HADOOP_HOME</code>
environment variable, setting this value to <code>/disk1/hadoop</code> and the replacement path to
<code>$HADOOP_HOME</code> will make sure that paths used to launch remote processes properly
reference the local YARN configuration.
</td>
</tr>
<tr>
<td><code>spark.yarn.config.replacementPath</code></td>
<td>(none)</td>
<td>
See <code>spark.yarn.config.gatewayPath</code>.
</td>
</tr>
</table>

# Launching Spark on YARN
Expand Down
47 changes: 37 additions & 10 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ private[spark] class Client(
val libraryPaths = Seq(sys.props.get("spark.driver.extraLibraryPath"),
sys.props.get("spark.driver.libraryPath")).flatten
if (libraryPaths.nonEmpty) {
prefixEnv = Some(Utils.libraryPathEnvPrefix(libraryPaths))
prefixEnv = Some(getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(libraryPaths)))
}
if (sparkConf.getOption("spark.yarn.am.extraJavaOptions").isDefined) {
logWarning("spark.yarn.am.extraJavaOptions will not take effect in cluster mode")
Expand All @@ -698,7 +698,7 @@ private[spark] class Client(
}

sparkConf.getOption("spark.yarn.am.extraLibraryPath").foreach { paths =>
prefixEnv = Some(Utils.libraryPathEnvPrefix(Seq(paths)))
prefixEnv = Some(getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(Seq(paths))))
}
}

Expand Down Expand Up @@ -1106,10 +1106,10 @@ object Client extends Logging {
env: HashMap[String, String],
isAM: Boolean,
extraClassPath: Option[String] = None): Unit = {
extraClassPath.foreach(addClasspathEntry(_, env))
addClasspathEntry(
YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env
)
extraClassPath.foreach { cp =>
addClasspathEntry(getClusterPath(sparkConf, cp), env)
}
addClasspathEntry(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env)

if (isAM) {
addClasspathEntry(
Expand All @@ -1125,12 +1125,14 @@ object Client extends Logging {
getUserClasspath(sparkConf)
}
userClassPath.foreach { x =>
addFileToClasspath(x, null, env)
addFileToClasspath(sparkConf, x, null, env)
}
}
addFileToClasspath(new URI(sparkJar(sparkConf)), SPARK_JAR, env)
addFileToClasspath(sparkConf, new URI(sparkJar(sparkConf)), SPARK_JAR, env)
populateHadoopClasspath(conf, env)
sys.env.get(ENV_DIST_CLASSPATH).foreach(addClasspathEntry(_, env))
sys.env.get(ENV_DIST_CLASSPATH).foreach { cp =>
addClasspathEntry(getClusterPath(sparkConf, cp), env)
}
}

/**
Expand Down Expand Up @@ -1159,16 +1161,18 @@ object Client extends Logging {
*
* If not a "local:" file and no alternate name, the environment is not modified.
*
* @parma conf Spark configuration.
* @param uri URI to add to classpath (optional).
* @param fileName Alternate name for the file (optional).
* @param env Map holding the environment variables.
*/
private def addFileToClasspath(
conf: SparkConf,
uri: URI,
fileName: String,
env: HashMap[String, String]): Unit = {
if (uri != null && uri.getScheme == LOCAL_SCHEME) {
addClasspathEntry(uri.getPath, env)
addClasspathEntry(getClusterPath(conf, uri.getPath), env)
} else if (fileName != null) {
addClasspathEntry(buildPath(
YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env)
Expand All @@ -1182,6 +1186,29 @@ object Client extends Logging {
private def addClasspathEntry(path: String, env: HashMap[String, String]): Unit =
YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, path)

/**
* Returns the path to be sent to the NM for a path that is valid on the gateway.
*
* This method uses two configuration values:
*
* - spark.yarn.config.gatewayPath: a string that identifies a portion of the input path that may
* only be valid in the gateway node.
* - spark.yarn.config.replacementPath: a string with which to replace the gateway path. This may
* contain, for example, env variable references, which will be expanded by the NMs when
* starting containers.
*
* If either config is not available, the input path is returned.
*/
def getClusterPath(conf: SparkConf, path: String): String = {
val localPath = conf.get("spark.yarn.config.gatewayPath", null)
val clusterPath = conf.get("spark.yarn.config.replacementPath", null)
if (localPath != null && clusterPath != null) {
path.replace(localPath, clusterPath)
} else {
path
}
}

/**
* Obtains token for the Hive metastore and adds them to the credentials.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class ExecutorRunnable(
javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
}
sys.props.get("spark.executor.extraLibraryPath").foreach { p =>
prefixEnv = Some(Utils.libraryPathEnvPrefix(Seq(p)))
prefixEnv = Some(Client.getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(Seq(p))))
}

javaOpts += "-Djava.io.tmpdir=" +
Expand Down Expand Up @@ -195,7 +195,7 @@ class ExecutorRunnable(
val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri =>
val absPath =
if (new File(uri.getPath()).isAbsolute()) {
uri.getPath()
Client.getClusterPath(sparkConf, uri.getPath())
} else {
Client.buildPath(Environment.PWD.$(), uri.getPath())
}
Expand Down
19 changes: 19 additions & 0 deletions yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,25 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll {
}
}

test("Cluster path translation") {
val conf = new Configuration()
val sparkConf = new SparkConf()
.set(Client.CONF_SPARK_JAR, "local:/localPath/spark.jar")
.set("spark.yarn.config.gatewayPath", "/localPath")
.set("spark.yarn.config.replacementPath", "/remotePath")

Client.getClusterPath(sparkConf, "/localPath") should be ("/remotePath")
Client.getClusterPath(sparkConf, "/localPath/1:/localPath/2") should be (
"/remotePath/1:/remotePath/2")

val env = new MutableHashMap[String, String]()
Client.populateClasspath(null, conf, sparkConf, env, false,
extraClassPath = Some("/localPath/my1.jar"))
val cp = classpath(env)
cp should contain ("/remotePath/spark.jar")
cp should contain ("/remotePath/my1.jar")
}

object Fixtures {

val knownDefYarnAppCP: Seq[String] =
Expand Down

0 comments on commit 37bf76a

Please sign in to comment.