Skip to content

Commit 06b2b45

Browse files
committed
Use ThreadUtils instead of parallel collections
1 parent ba36de4 commit 06b2b45

File tree

1 file changed

+23
-13
lines changed
  • sql/core/src/main/scala/org/apache/spark/sql/execution/datasources

1 file changed

+23
-13
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import scala.util.{Failure, Success, Try}
2626
import org.apache.hadoop.conf.Configuration
2727
import org.apache.hadoop.fs.Path
2828

29+
import org.apache.spark.SparkException
2930
import org.apache.spark.deploy.SparkHadoopUtil
3031
import org.apache.spark.internal.Logging
3132
import org.apache.spark.sql._
@@ -50,7 +51,7 @@ import org.apache.spark.sql.sources._
5051
import org.apache.spark.sql.streaming.OutputMode
5152
import org.apache.spark.sql.types.{CalendarIntervalType, StructField, StructType}
5253
import org.apache.spark.sql.util.SchemaUtils
53-
import org.apache.spark.util.Utils
54+
import org.apache.spark.util.{ThreadUtils, Utils}
5455

5556
/**
5657
* The main class responsible for representing a pluggable Data Source in Spark SQL. In addition to
@@ -749,22 +750,31 @@ object DataSource extends Logging {
749750
// for globbed paths.
750751
val (globPaths, nonGlobPaths) = qualifiedPaths.partition(SparkHadoopUtil.get.isGlobPath)
751752

752-
val globbedPaths = globPaths.par.flatMap { globPath =>
753-
val fs = globPath.getFileSystem(hadoopConf)
754-
val globResult = SparkHadoopUtil.get.globPath(fs, globPath)
753+
val globbedPaths = Try({
754+
ThreadUtils.parmap(globPaths, "globPath", 8) { globPath =>
755+
val fs = globPath.getFileSystem(hadoopConf)
756+
val globResult = SparkHadoopUtil.get.globPath(fs, globPath)
755757

756-
if (checkEmptyGlobPath && globResult.isEmpty) {
757-
throw new AnalysisException(s"Path does not exist: $globPath")
758-
}
758+
if (checkEmptyGlobPath && globResult.isEmpty) {
759+
throw new AnalysisException(s"Path does not exist: $globPath")
760+
}
761+
762+
globResult
763+
}.flatten
764+
}).recoverWith({
765+
case e: SparkException => throw e.getCause
766+
}).get
759767

760-
globResult
761-
}
762768

763-
nonGlobPaths.par.foreach { path =>
764-
val fs = path.getFileSystem(hadoopConf)
765-
if (checkFilesExist && !fs.exists(path)) {
766-
throw new AnalysisException(s"Path does not exist: $path")
769+
try {
770+
ThreadUtils.parmap(nonGlobPaths, "checkPathsExist", 8) { path =>
771+
val fs = path.getFileSystem(hadoopConf)
772+
if (checkFilesExist && !fs.exists(path)) {
773+
throw new AnalysisException(s"Path does not exist: $path")
774+
}
767775
}
776+
} catch {
777+
case e: SparkException => throw e.getCause
768778
}
769779

770780
val allPaths = globbedPaths ++ nonGlobPaths

0 commit comments

Comments
 (0)