Skip to content

Commit 4c027c5

Browse files
fenzhuGitHub Enterprise
authored andcommitted
[CARMEL-7557][CARMEL-4814] Use proper file system (apache#217)
1 parent af4c2a8 commit 4c027c5

File tree

1 file changed

+7
-6
lines changed

1 file changed

+7
-6
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ import java.util.concurrent.TimeUnit._
2121
import java.util.regex.Pattern
2222

2323
import org.apache.commons.lang3.StringUtils
24-
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
24+
import org.apache.hadoop.conf.Configuration
25+
import org.apache.hadoop.fs.{FileStatus, Path}
2526

2627
import org.apache.spark.SparkContext
2728
import org.apache.spark.rdd.RDD
@@ -327,9 +328,10 @@ trait FileSourceScanLike extends DataSourceScanExec {
327328
if (sourcePaths.forall(p => !pattern.matcher(p).find())) return partitions
328329

329330
val localPaths = sourcePaths.map(_.replaceFirst(remoteHdfsRegex, localHdfsPrefix))
330-
def listFilesIfExists(paths: Seq[String], fileSystem: FileSystem): Seq[FileStatus] = {
331+
def listFilesIfExists(paths: Seq[String], conf: Configuration): Seq[FileStatus] = {
331332
paths.flatMap { path =>
332-
val p = new Path(path).makeQualified(fileSystem.getUri, fileSystem.getWorkingDirectory)
333+
val p = new Path(path)
334+
val fileSystem = p.getFileSystem(conf)
333335
if (fileSystem.exists(p)) {
334336
fileSystem.listStatus(p)
335337
} else {
@@ -350,14 +352,13 @@ trait FileSourceScanLike extends DataSourceScanExec {
350352
sparkContext.setJobDescription("List local cache directories.")
351353
val numParallelism = Math.min(localPaths.size, parallelPartitionDiscoveryParallelism)
352354
sparkContext.parallelize(localPaths, numParallelism).mapPartitions { lps =>
353-
val fs = new Path(lps.toSeq.head).getFileSystem(serializableConfiguration.value)
354-
listFilesIfExists(lps.toSeq, fs).iterator
355+
listFilesIfExists(lps.toSeq, serializableConfiguration.value).iterator
355356
}.collect().toSeq
356357
} finally {
357358
sparkContext.setJobDescription(previousJobDescription)
358359
}
359360
} else {
360-
listFilesIfExists(localPaths, new Path(localPaths.head).getFileSystem(hadoopConf))
361+
listFilesIfExists(localPaths, hadoopConf)
361362
}
362363

363364
val finishList = System.currentTimeMillis()

0 commit comments

Comments
 (0)