Skip to content

[SPARK-16313][SQL] Spark should not silently drop exceptions in file listing #13987

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/pyspark/sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ def readStream(self):

:return: :class:`DataStreamReader`

>>> text_sdf = sqlContext.readStream.text(os.path.join(tempfile.mkdtemp(), 'data'))
>>> text_sdf = sqlContext.readStream.text(tempfile.mkdtemp())
>>> text_sdf.isStreaming
True
"""
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ def text(self, path):

:param paths: string, or list of strings, for input path(s).

>>> text_sdf = spark.readStream.text(os.path.join(tempfile.mkdtemp(), 'data'))
>>> text_sdf = spark.readStream.text(tempfile.mkdtemp())
>>> text_sdf.isStreaming
True
>>> "value" in str(text_sdf.schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,8 @@ case class DataSource(
}

val fileCatalog =
new ListingFileCatalog(sparkSession, globbedPaths, options, partitionSchema)
new ListingFileCatalog(
sparkSession, globbedPaths, options, partitionSchema, !checkPathExist)

val dataSchema = userSpecifiedSchema.map { schema =>
val equality =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.datasources

import java.io.FileNotFoundException

import scala.collection.mutable
import scala.util.Try

Expand All @@ -35,12 +37,16 @@ import org.apache.spark.sql.types.StructType
* @param paths a list of paths to scan
* @param partitionSchema an optional partition schema that will be use to provide types for the
* discovered partitions
* @param ignoreFileNotFound if true, return empty file list when encountering a
* [[FileNotFoundException]] in file listing. Note that this is a hack
* for SPARK-16313. We should get rid of this flag in the future.
*/
class ListingFileCatalog(
sparkSession: SparkSession,
override val paths: Seq[Path],
parameters: Map[String, String],
partitionSchema: Option[StructType])
partitionSchema: Option[StructType],
ignoreFileNotFound: Boolean = false)
extends PartitioningAwareFileCatalog(sparkSession, parameters, partitionSchema) {

@volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
Expand Down Expand Up @@ -77,10 +83,12 @@ class ListingFileCatalog(
* List leaf files of given paths. This method will submit a Spark job to do parallel
* listing whenever there is a path having more files than the parallel partition discovery
* discovery threshold.
*
* This is publicly visible for testing.
*/
protected[spark] def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession, ignoreFileNotFound)
} else {
// Right now, the number of paths is less than the value of
// parallelPartitionDiscoveryThreshold. So, we will list file statues at the driver.
Expand All @@ -96,8 +104,12 @@ class ListingFileCatalog(
logTrace(s"Listing $path on driver")

val childStatuses = {
// TODO: We need to avoid of using Try at here.
val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
val stats =
try {
fs.listStatus(path)
} catch {
case e: FileNotFoundException if ignoreFileNotFound => Array.empty[FileStatus]
}
if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,8 @@ private[sql] object HadoopFsRelation extends Logging {
def listLeafFilesInParallel(
paths: Seq[Path],
hadoopConf: Configuration,
sparkSession: SparkSession): mutable.LinkedHashSet[FileStatus] = {
sparkSession: SparkSession,
ignoreFileNotFound: Boolean): mutable.LinkedHashSet[FileStatus] = {
assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")

Expand All @@ -461,9 +462,11 @@ private[sql] object HadoopFsRelation extends Logging {
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
paths.map(new Path(_)).flatMap { path =>
val fs = path.getFileSystem(serializableConfiguration.value)
// TODO: We need to avoid of using Try at here.
Try(listLeafFiles(fs, fs.getFileStatus(path), pathFilter))
.getOrElse(Array.empty[FileStatus])
try {
listLeafFiles(fs, fs.getFileStatus(path), pathFilter)
} catch {
case e: java.io.FileNotFoundException if ignoreFileNotFound => Array.empty[FileStatus]
}
}
}.map { status =>
val blockLocations = status match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {

// =============== Parquet file stream schema tests ================

test("FileStreamSource schema: parquet, no existing files, no schema") {
ignore("FileStreamSource schema: parquet, no existing files, no schema") {
withTempDir { src =>
withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
val e = intercept[AnalysisException] {
Expand Down