Skip to content

Commit

Permalink
[SPARK-14705][YARN] support Multiple FileSystem for YARN STAGING DIR
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
In SPARK-13063, It makes the SPARK YARN STAGING DIR as configurable. But it only support default FileSystem. If there are many clusters, It can be different FileSystem for different cluster in our spark.

## How was this patch tested?
I have tested it successfully with following commands:
MASTER=yarn-client ./bin/spark-shell --conf spark.yarn.stagingDir=hdfs:namenode2/temp
$SPARK_HOME/bin/spark-submit --conf spark.yarn.stagingDir=hdfs:namenode2/temp

cc tgravescs vanzin andrewor14

Author: Lianhui Wang <lianhuiwang09@gmail.com>

Closes #12473 from lianhuiwang/SPARK-14705.
  • Loading branch information
lianhuiwang authored and Marcelo Vanzin committed Apr 20, 2016
1 parent 3ae25f2 commit 4514aeb
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 36 deletions.
49 changes: 19 additions & 30 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ private[spark] class Client(

private var appId: ApplicationId = null

// The app staging dir based on the STAGING_DIR configuration if configured
// otherwise based on the users home directory.
private val appStagingBaseDir = sparkConf.get(STAGING_DIR).map { new Path(_) }
.getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())

def reportLauncherState(state: SparkAppHandle.State): Unit = {
launcherBackend.setState(state)
}
Expand Down Expand Up @@ -179,18 +184,17 @@ private[spark] class Client(
* Cleanup application staging directory.
*/
private def cleanupStagingDir(appId: ApplicationId): Unit = {
val appStagingDir = getAppStagingDir(appId)
val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
try {
val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
val fs = FileSystem.get(hadoopConf)
val stagingDirPath = getAppStagingDirPath(sparkConf, fs, appStagingDir)
val fs = stagingDirPath.getFileSystem(hadoopConf)
if (!preserveFiles && fs.exists(stagingDirPath)) {
logInfo("Deleting staging directory " + stagingDirPath)
fs.delete(stagingDirPath, true)
}
} catch {
case ioe: IOException =>
logWarning("Failed to cleanup staging dir " + appStagingDir, ioe)
logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe)
}
}

Expand Down Expand Up @@ -351,14 +355,13 @@ private[spark] class Client(
* Exposed for testing.
*/
def prepareLocalResources(
appStagingDir: String,
destDir: Path,
pySparkArchives: Seq[String]): HashMap[String, LocalResource] = {
logInfo("Preparing resources for our AM container")
// Upload Spark and the application JAR to the remote file system if necessary,
// and add them as local resources to the application master.
val fs = FileSystem.get(hadoopConf)
val dst = getAppStagingDirPath(sparkConf, fs, appStagingDir)
val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
val fs = destDir.getFileSystem(hadoopConf)
val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + destDir
YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials)
// Used to keep track of URIs added to the distributed cache. If the same URI is added
// multiple times, YARN will fail to launch containers for the app with an internal
Expand All @@ -372,9 +375,9 @@ private[spark] class Client(
YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf, credentials)

val replication = sparkConf.get(STAGING_FILE_REPLICATION).map(_.toShort)
.getOrElse(fs.getDefaultReplication(dst))
.getOrElse(fs.getDefaultReplication(destDir))
val localResources = HashMap[String, LocalResource]()
FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
FileSystem.mkdirs(fs, destDir, new FsPermission(STAGING_DIR_PERMISSION))

val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()

Expand Down Expand Up @@ -422,7 +425,7 @@ private[spark] class Client(
val localPath = getQualifiedLocalPath(localURI, hadoopConf)
val linkname = targetDir.map(_ + "/").getOrElse("") +
destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName())
val destPath = copyFileToRemote(dst, localPath, replication)
val destPath = copyFileToRemote(destDir, localPath, replication)
val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
distCacheMgr.addResource(
destFs, hadoopConf, destPath, localResources, resType, linkname, statCache,
Expand Down Expand Up @@ -666,17 +669,15 @@ private[spark] class Client(
* Set up the environment for launching our ApplicationMaster container.
*/
private def setupLaunchEnv(
stagingDir: String,
stagingDirPath: Path,
pySparkArchives: Seq[String]): HashMap[String, String] = {
logInfo("Setting up the launch environment for our AM container")
val env = new HashMap[String, String]()
populateClasspath(args, yarnConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH))
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
if (loginFromKeytab) {
val remoteFs = FileSystem.get(hadoopConf)
val stagingDirPath = getAppStagingDirPath(sparkConf, remoteFs, stagingDir)
val credentialsFile = "credentials-" + UUID.randomUUID().toString
sparkConf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString)
logInfo(s"Credentials file set to: $credentialsFile")
Expand Down Expand Up @@ -776,15 +777,15 @@ private[spark] class Client(
: ContainerLaunchContext = {
logInfo("Setting up container launch context for our AM")
val appId = newAppResponse.getApplicationId
val appStagingDir = getAppStagingDir(appId)
val appStagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
val pySparkArchives =
if (sparkConf.get(IS_PYTHON_APP)) {
findPySparkArchives()
} else {
Nil
}
val launchEnv = setupLaunchEnv(appStagingDir, pySparkArchives)
val localResources = prepareLocalResources(appStagingDir, pySparkArchives)
val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives)
val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives)

// Set the environment variables to be passed on to the executors.
distCacheMgr.setDistFilesEnv(launchEnv)
Expand Down Expand Up @@ -1446,16 +1447,4 @@ private object Client extends Logging {
uri.startsWith(s"$LOCAL_SCHEME:")
}

/**
* Returns the app staging dir based on the STAGING_DIR configuration if configured
* otherwise based on the users home directory.
*/
private def getAppStagingDirPath(
conf: SparkConf,
fs: FileSystem,
appStagingDir: String): Path = {
val baseDir = conf.get(STAGING_DIR).map { new Path(_) }.getOrElse(fs.getHomeDirectory())
new Path(baseDir, appStagingDir)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll

val tempDir = Utils.createTempDir()
try {
client.prepareLocalResources(tempDir.getAbsolutePath(), Nil)
client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil)
sparkConf.get(APP_JAR) should be (Some(USER))

// The non-local path should be propagated by name only, since it will end up in the app's
Expand Down Expand Up @@ -238,7 +238,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val client = createClient(sparkConf)

val tempDir = Utils.createTempDir()
client.prepareLocalResources(tempDir.getAbsolutePath(), Nil)
client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil)

assert(sparkConf.get(SPARK_JARS) ===
Some(Seq(s"local:${jar4.getPath()}", s"local:${single.getAbsolutePath()}/*")))
Expand All @@ -260,14 +260,14 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll

val sparkConf = new SparkConf().set(SPARK_ARCHIVE, archive.getPath())
val client = createClient(sparkConf)
client.prepareLocalResources(temp.getAbsolutePath(), Nil)
client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)

verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort())
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))

sparkConf.set(SPARK_ARCHIVE, LOCAL_SCHEME + ":" + archive.getPath())
intercept[IllegalArgumentException] {
client.prepareLocalResources(temp.getAbsolutePath(), Nil)
client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
}
}

Expand All @@ -280,7 +280,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll

val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath()))
val client = createClient(sparkConf)
client.prepareLocalResources(temp.getAbsolutePath(), Nil)
client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort())
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
}
Expand Down Expand Up @@ -308,7 +308,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll

val client = createClient(sparkConf)
val tempDir = Utils.createTempDir()
client.prepareLocalResources(tempDir.getAbsolutePath(), Nil)
client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil)

// Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name with jar1 will be
// ignored.
Expand Down

0 comments on commit 4514aeb

Please sign in to comment.