Skip to content

Commit 2766055

Browse files
committed
Use url.hashCode + timestamp as cachedFileName
1 parent 76a7b66 commit 2766055

File tree

3 files changed

+15
-11
lines changed

3 files changed

+15
-11
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -809,7 +809,7 @@ class SparkContext(config: SparkConf) extends Logging {
809809
addedFiles(key) = timestamp
810810

811811
// Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
812-
Utils.fetchCachedFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
812+
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
813813
hadoopConfiguration, timestamp, useCache = false)
814814

815815
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -322,13 +322,13 @@ private[spark] class Executor(
322322
// Fetch missing dependencies
323323
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
324324
logInfo("Fetching " + name + " with timestamp " + timestamp)
325-
Utils.fetchCachedFile(name, new File(SparkFiles.getRootDirectory), conf,
325+
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf,
326326
env.securityManager, hadoopConf, timestamp, true)
327327
currentFiles(name) = timestamp
328328
}
329329
for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
330330
logInfo("Fetching " + name + " with timestamp " + timestamp)
331-
Utils.fetchCachedFile(name, new File(SparkFiles.getRootDirectory), conf,
331+
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf,
332332
env.securityManager, hadoopConf, timestamp, true)
333333
currentJars(name) = timestamp
334334
// Add it to our class loader

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ private[spark] object Utils extends Logging {
322322
* Throws SparkException if the target file already exists and has different contents than
323323
* the requested file.
324324
*/
325-
def fetchCachedFile(
325+
def fetchFile(
326326
url: String,
327327
targetDir: File,
328328
conf: SparkConf,
@@ -333,27 +333,27 @@ private[spark] object Utils extends Logging {
333333
val fileName = url.split("/").last
334334
val targetFile = new File(targetDir, fileName)
335335
if (useCache) {
336-
val cachedFileName = fileName + timestamp
337-
val lockFileName = fileName + timestamp + "_lock"
336+
val cachedFileName = url.hashCode + timestamp + "_cach"
337+
val lockFileName = url.hashCode + timestamp + "_lock"
338338
val localDir = new File(getLocalDir(conf))
339339
val lockFile = new File(localDir, lockFileName)
340340
val raf = new RandomAccessFile(lockFile, "rw")
341341
// Only one executor entry.
342342
// The FileLock is only used to control synchronization for executors download file,
343343
// it's always safe regardless of lock type(mandatory or advisory).
344344
val lock = raf.getChannel().lock()
345-
val cachedFile = new File(SparkFiles.getRootDirectory + "../", cachedFileName)
345+
val cachedFile = new File(localDir, cachedFileName)
346346
try {
347347
if (!cachedFile.exists()) {
348-
fetchFile(url, localDir, conf, securityMgr, hadoopConf)
348+
doFetchFile(url, localDir, conf, securityMgr, hadoopConf)
349349
Files.move(new File(localDir, fileName), cachedFile)
350350
}
351351
} finally {
352352
lock.release()
353353
}
354354
Files.copy(cachedFile, targetFile)
355355
} else {
356-
fetchFile(url, targetDir, conf, securityMgr, hadoopConf)
356+
doFetchFile(url, targetDir, conf, securityMgr, hadoopConf)
357357
}
358358

359359
// Decompress the file if it's a .tar or .tar.gz
@@ -375,8 +375,12 @@ private[spark] object Utils extends Logging {
375375
* Throws SparkException if the target file already exists and has different contents than
376376
* the requested file.
377377
*/
378-
private def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager,
379-
hadoopConf: Configuration) {
378+
private def doFetchFile(
379+
url: String,
380+
targetDir: File,
381+
conf: SparkConf,
382+
securityMgr: SecurityManager,
383+
hadoopConf: Configuration) {
380384
val filename = url.split("/").last
381385
val tempDir = getLocalDir(conf)
382386
val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))

0 commit comments

Comments
 (0)