Skip to content

Commit 3e92d44

Browse files
committed
Move local dirs override logic into Utils; fix bugs:
Now, the logic for determining the precedence of the different configuration options is in Utils.getOrCreateLocalRootDirs(). DiskBlockManager now accepts a SparkConf rather than a list of root directories and I’ve updated other tests to reflect this.
1 parent b2c4736 commit 3e92d44

File tree

6 files changed

+60
-39
lines changed

6 files changed

+60
-39
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -62,16 +62,6 @@ private[spark] class Executor(
6262
val conf = new SparkConf(true)
6363
conf.setAll(properties)
6464

65-
// If we are in yarn mode, systems can have different disk layouts so we must set it
66-
// to what Yarn on this system said was available. This will be used later when SparkEnv
67-
// created.
68-
if (java.lang.Boolean.valueOf(
69-
System.getProperty("SPARK_YARN_MODE", conf.getenv("SPARK_YARN_MODE")))) {
70-
conf.set("spark.local.dir", getYarnLocalDirs(conf))
71-
} else if (conf.getenv("SPARK_LOCAL_DIRS") != null) {
72-
conf.set("spark.local.dir", conf.getenv("SPARK_LOCAL_DIRS"))
73-
}
74-
7565
if (!isLocal) {
7666
// Setup an uncaught exception handler for non-local mode.
7767
// Make any thread terminations due to uncaught exceptions kill the entire
@@ -134,21 +124,6 @@ private[spark] class Executor(
134124
threadPool.shutdown()
135125
}
136126

137-
/** Get the Yarn approved local directories. */
138-
private def getYarnLocalDirs(conf: SparkConf): String = {
139-
// Hadoop 0.23 and 2.x have different Environment variable names for the
140-
// local dirs, so lets check both. We assume one of the 2 is set.
141-
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
142-
val localDirs = Option(conf.getenv("YARN_LOCAL_DIRS"))
143-
.getOrElse(Option(conf.getenv("LOCAL_DIRS"))
144-
.getOrElse(""))
145-
146-
if (localDirs.isEmpty) {
147-
throw new Exception("Yarn Local dirs can't be empty")
148-
}
149-
localDirs
150-
}
151-
152127
class TaskRunner(
153128
execBackend: ExecutorBackend, val taskId: Long, taskName: String, serializedTask: ByteBuffer)
154129
extends Runnable {

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@ private[spark] class BlockManager(
6464

6565
private val port = conf.getInt("spark.blockManager.port", 0)
6666
val shuffleBlockManager = new ShuffleBlockManager(this, shuffleManager)
67-
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
68-
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
67+
val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
6968
val connectionManager =
7069
new ConnectionManager(port, conf, securityManager, "Connection manager for block manager")
7170

core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.File
2121
import java.text.SimpleDateFormat
2222
import java.util.{Date, Random, UUID}
2323

24-
import org.apache.spark.{SparkEnv, Logging}
24+
import org.apache.spark.{SparkConf, SparkEnv, Logging}
2525
import org.apache.spark.executor.ExecutorExitCode
2626
import org.apache.spark.network.netty.{PathResolver, ShuffleSender}
2727
import org.apache.spark.util.Utils
@@ -33,9 +33,10 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
3333
* However, it is also possible to have a block map to only a segment of a file, by calling
3434
* mapBlockToFileSegment().
3535
*
36-
* @param rootDirs The directories to use for storing block files. Data will be hashed among these.
36+
* Block files are hashed among the directories listed in spark.local.dir (or in
37+
* SPARK_LOCAL_DIRS, if it's set).
3738
*/
38-
private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, rootDirs: String)
39+
private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, conf: SparkConf)
3940
extends PathResolver with Logging {
4041

4142
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@@ -46,7 +47,7 @@ private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager,
4647
/* Create one local directory for each path mentioned in spark.local.dir; then, inside this
4748
* directory, create multiple subdirectories that we will hash files into, in order to avoid
4849
* having really large inodes at the top level. */
49-
val localDirs: Array[File] = createLocalDirs()
50+
val localDirs: Array[File] = createLocalDirs(conf)
5051
if (localDirs.isEmpty) {
5152
logError("Failed to create any local dir.")
5253
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
@@ -131,10 +132,9 @@ private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager,
131132
(blockId, getFile(blockId))
132133
}
133134

134-
private def createLocalDirs(): Array[File] = {
135-
logDebug(s"Creating local directories at root dirs '$rootDirs'")
135+
private def createLocalDirs(conf: SparkConf): Array[File] = {
136136
val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
137-
rootDirs.split(",").flatMap { rootDir =>
137+
Utils.getOrCreateLocalRootDirs(conf).flatMap { rootDir =>
138138
var foundLocalDir = false
139139
var localDir: File = null
140140
var localDirId: String = null

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -451,10 +451,56 @@ private[spark] object Utils extends Logging {
451451
/**
452452
* Get a temporary directory using Spark's spark.local.dir property, if set. This will always
453453
* return a single directory, even though the spark.local.dir property might be a list of
454-
* multiple paths.
454+
* multiple paths. If the SPARK_LOCAL_DIRS environment variable is set, then this will return
455+
* a directory from that variable.
455456
*/
456457
def getLocalDir(conf: SparkConf): String = {
457-
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
458+
getOrCreateLocalRootDirs(conf)(0)
459+
}
460+
461+
/**
462+
* Gets or creates the directories listed in spark.local.dir or SPARK_LOCAL_DIRS,
463+
* and returns only the directories that exist / could be created.
464+
*/
465+
private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = {
466+
val isYarn = java.lang.Boolean.valueOf(
467+
System.getProperty("SPARK_YARN_MODE", conf.getenv("SPARK_YARN_MODE")))
468+
val confValue = if (isYarn) {
469+
// If we are in yarn mode, systems can have different disk layouts so we must set it
470+
// to what Yarn on this system said was available.
471+
getYarnLocalDirs(conf)
472+
} else {
473+
Option(conf.getenv("SPARK_LOCAL_DIRS")).getOrElse(
474+
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
475+
}
476+
val rootDirs = confValue.split(',')
477+
logDebug(s"Getting/creating local root dirs at '$rootDirs'")
478+
479+
rootDirs.flatMap { rootDir =>
480+
val localDir: File = new File(rootDir)
481+
val foundLocalDir = localDir.exists || localDir.mkdirs()
482+
if (!foundLocalDir) {
483+
logError(s"Failed to create local root dir in $rootDir. Ignoring this directory.")
484+
None
485+
} else {
486+
Some(rootDir)
487+
}
488+
}
489+
}
490+
491+
/** Get the Yarn approved local directories. */
492+
private def getYarnLocalDirs(conf: SparkConf): String = {
493+
// Hadoop 0.23 and 2.x have different Environment variable names for the
494+
// local dirs, so lets check both. We assume one of the 2 is set.
495+
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
496+
val localDirs = Option(conf.getenv("YARN_LOCAL_DIRS"))
497+
.getOrElse(Option(conf.getenv("LOCAL_DIRS"))
498+
.getOrElse(""))
499+
500+
if (localDirs.isEmpty) {
501+
throw new Exception("Yarn Local dirs can't be empty")
502+
}
503+
localDirs
458504
}
459505

460506
/**

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -825,8 +825,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
825825
val blockManager = mock(classOf[BlockManager])
826826
val shuffleBlockManager = mock(classOf[ShuffleBlockManager])
827827
when(shuffleBlockManager.conf).thenReturn(conf)
828-
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
829-
System.getProperty("java.io.tmpdir"))
828+
val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
830829

831830
when(blockManager.conf).thenReturn(conf.clone.set(confKey, 0.toString))
832831
val diskStoreMapped = new DiskStore(blockManager, diskBlockManager)

core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
7171
}
7272

7373
override def beforeEach() {
74-
diskBlockManager = new DiskBlockManager(shuffleBlockManager, rootDirs)
74+
val conf = testConf.clone
75+
conf.set("spark.local.dir", rootDirs)
76+
diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
7577
shuffleBlockManager.idToSegmentMap.clear()
7678
}
7779

0 commit comments

Comments
 (0)