Skip to content

[SPARK-13063] [YARN] Make the SPARK YARN STAGING DIR as configurable #12082

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ If you need a reference to the proper location to put log files in the YARN so t
HDFS replication level for the files uploaded into HDFS for the application. These include things like the Spark jar, the app jar, and any distributed cache files/archives.
</td>
</tr>
<tr>
<td><code>spark.yarn.stagingDir</code></td>
<td>Current user's home directory in the filesystem</td>
<td>
Staging directory used while submitting applications.
</td>
</tr>
<tr>
<td><code>spark.yarn.preserve.staging.files</code></td>
<td><code>false</code></td>
Expand Down
18 changes: 15 additions & 3 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ private[spark] class Client(
val appStagingDir = getAppStagingDir(appId)
try {
val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
val stagingDirPath = new Path(appStagingDir)
val fs = FileSystem.get(hadoopConf)
val stagingDirPath = getAppStagingDirPath(sparkConf, fs, appStagingDir)
if (!preserveFiles && fs.exists(stagingDirPath)) {
logInfo("Deleting staging directory " + stagingDirPath)
fs.delete(stagingDirPath, true)
Expand Down Expand Up @@ -334,7 +334,7 @@ private[spark] class Client(
// Upload Spark and the application JAR to the remote file system if necessary,
// and add them as local resources to the application master.
val fs = FileSystem.get(hadoopConf)
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
val dst = getAppStagingDirPath(sparkConf, fs, appStagingDir)
val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials)
// Used to keep track of URIs added to the distributed cache. If the same URI is added
Expand Down Expand Up @@ -666,7 +666,7 @@ private[spark] class Client(
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
if (loginFromKeytab) {
val remoteFs = FileSystem.get(hadoopConf)
val stagingDirPath = new Path(remoteFs.getHomeDirectory, stagingDir)
val stagingDirPath = getAppStagingDirPath(sparkConf, remoteFs, stagingDir)
val credentialsFile = "credentials-" + UUID.randomUUID().toString
sparkConf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString)
logInfo(s"Credentials file set to: $credentialsFile")
Expand Down Expand Up @@ -1444,4 +1444,16 @@ object Client extends Logging {
uri.startsWith(s"$LOCAL_SCHEME:")
}

/**
* Returns the app staging dir based on the STAGING_DIR configuration if configured
* otherwise based on the users home directory.
*/
private def getAppStagingDirPath(
conf: SparkConf,
fs: FileSystem,
appStagingDir: String): Path = {
val baseDir = conf.get(STAGING_DIR).map { new Path(_) }.getOrElse(fs.getHomeDirectory())
new Path(baseDir, appStagingDir)
}

}
5 changes: 5 additions & 0 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ package object config {
.intConf
.optional

private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
.doc("Staging directory used while submitting applications.")
.stringConf
.optional

/* Cluster-mode launcher configuration. */

private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion")
Expand Down