Skip to content

Commit 3510eb0

Browse files
committed
Keep fetchFile private
1 parent 2ffd742 commit 3510eb0

File tree

3 files changed

+42
-34
lines changed

3 files changed

+42
-34
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -805,11 +805,12 @@ class SparkContext(config: SparkConf) extends Logging {
805805
case "local" => "file:" + uri.getPath
806806
case _ => path
807807
}
808-
addedFiles(key) = System.currentTimeMillis
808+
val timestamp = System.currentTimeMillis
809+
addedFiles(key) = timestamp
809810

810811
// Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
811-
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
812-
hadoopConfiguration)
812+
Utils.fetchCachedFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
813+
timestamp, hadoopConfiguration, false)
813814

814815
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
815816
postEnvironmentUpdate()

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -323,12 +323,13 @@ private[spark] class Executor(
323323
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
324324
logInfo("Fetching " + name + " with timestamp " + timestamp)
325325
Utils.fetchCachedFile(name, new File(SparkFiles.getRootDirectory), conf,
326-
env.securityManager, hadoopConf, timestamp)
326+
env.securityManager, hadoopConf, timestamp, true)
327+
currentFiles(name) = timestamp
327328
}
328329
for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
329330
logInfo("Fetching " + name + " with timestamp " + timestamp)
330331
Utils.fetchCachedFile(name, new File(SparkFiles.getRootDirectory), conf,
331-
env.securityManager, hadoopConf, timestamp)
332+
env.securityManager, hadoopConf, timestamp, true)
332333
currentJars(name) = timestamp
333334
// Add it to our class loader
334335
val localName = name.split("/").last

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -314,30 +314,46 @@ private[spark] object Utils extends Logging {
314314

315315
/**
316316
* Copy cached file to targetDir, if not exists, download it from url firstly.
317+
* If useCache == false, download file to targetDir directly.
317318
*/
318319
def fetchCachedFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager,
319-
timestamp: Long) {
320+
timestamp: Long, useCache: Boolean) {
320321
val fileName = url.split("/").last
321-
val cachedFileName = fileName + timestamp
322322
val targetFile = new File(targetDir, fileName)
323-
val lockFileName = fileName + timestamp + "_lock"
324-
val localDir = new File(getLocalDir(conf))
325-
val lockFile = new File(localDir, lockFileName)
326-
val raf = new RandomAccessFile(lockFile, "rw")
327-
// Only one executor entry.
328-
// The FileLock is only used to control synchronization for executors download file,
329-
// it's always safe regardless of lock type(mandatory or advisory).
330-
val lock = raf.getChannel().lock()
331-
val cachedFile = new File(localDir, cachedFileName)
332-
try {
333-
if (!cachedFile.exists()) {
334-
fetchFile(url, localDir, conf, securityMgr)
335-
Files.move(new File(localDir, fileName), cachedFile)
323+
if (useCache) {
324+
val cachedFileName = fileName + timestamp
325+
val lockFileName = fileName + timestamp + "_lock"
326+
val localDir = new File(getLocalDir(conf))
327+
val lockFile = new File(localDir, lockFileName)
328+
val raf = new RandomAccessFile(lockFile, "rw")
329+
// Only one executor entry.
330+
// The FileLock is only used to control synchronization for executors download file,
331+
// it's always safe regardless of lock type(mandatory or advisory).
332+
val lock = raf.getChannel().lock()
333+
val cachedFile = new File(localDir, cachedFileName)
334+
try {
335+
if (!cachedFile.exists()) {
336+
fetchFile(url, localDir, conf, securityMgr)
337+
Files.move(new File(localDir, fileName), cachedFile)
338+
}
339+
} finally {
340+
lock.release()
336341
}
337-
} finally {
338-
lock.release()
342+
Files.copy(cachedFile, targetFile)
343+
} else {
344+
fetchFile(url, targetDir, conf, securityMgr)
345+
}
346+
347+
// Decompress the file if it's a .tar or .tar.gz
348+
if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
349+
logInfo("Untarring " + fileName)
350+
Utils.execute(Seq("tar", "-xzf", fileName), targetDir)
351+
} else if (fileName.endsWith(".tar")) {
352+
logInfo("Untarring " + fileName)
353+
Utils.execute(Seq("tar", "-xf", fileName), targetDir)
339354
}
340-
Files.copy(cachedFile, targetFile)
355+
// Make the file executable - That's necessary for scripts
356+
FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
341357
}
342358

343359
/**
@@ -347,7 +363,7 @@ private[spark] object Utils extends Logging {
347363
* Throws SparkException if the target file already exists and has different contents than
348364
* the requested file.
349365
*/
350-
def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager,
366+
private def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager,
351367
hadoopConf: Configuration) {
352368
val filename = url.split("/").last
353369
val tempDir = getLocalDir(conf)
@@ -437,16 +453,6 @@ private[spark] object Utils extends Logging {
437453
}
438454
Files.move(tempFile, targetFile)
439455
}
440-
// Decompress the file if it's a .tar or .tar.gz
441-
if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) {
442-
logInfo("Untarring " + filename)
443-
Utils.execute(Seq("tar", "-xzf", filename), targetDir)
444-
} else if (filename.endsWith(".tar")) {
445-
logInfo("Untarring " + filename)
446-
Utils.execute(Seq("tar", "-xf", filename), targetDir)
447-
}
448-
// Make the file executable - That's necessary for scripts
449-
FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
450456
}
451457

452458
/**

0 commit comments

Comments
 (0)