Skip to content

[SPARK-17569] Make StructuredStreaming FileStreamSource batch generation faster #15122

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,14 @@ case class DataSource(
/**
* Create a resolved [[BaseRelation]] that can be used to read data from or write data into this
* [[DataSource]]
*
* @param checkFilesExist Whether to confirm that the files exist when generating the
* non-streaming file based datasource. StructuredStreaming jobs already
* list file existence, and when generating incremental jobs, the batch
* is considered as a non-streaming file based data source. Since we know
* that files already exist, we don't need to check them again.
*/
def resolveRelation(): BaseRelation = {
def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = {
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
// TODO: Throw when too much is given.
Expand Down Expand Up @@ -368,7 +374,7 @@ case class DataSource(
throw new AnalysisException(s"Path does not exist: $qualified")
}
// Sufficient to check head of the globPath seq for non-glob scenario
if (!fs.exists(globPath.head)) {
if (checkFilesExist && !fs.exists(globPath.head)) {
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
}
globPath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ class FileStreamSource(
userSpecifiedSchema = Some(schema),
className = fileFormatClassName,
options = sourceOptions.optionMapWithoutPath)
Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation(
checkFilesExist = false)))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,19 @@

package org.apache.spark.sql.execution.streaming

import java.io.File
import java.net.URI

import scala.util.Random

import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.execution.streaming.ExistsThrowsExceptionFileSystem._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.StructType

class FileStreamSourceSuite extends SparkFunSuite {
class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext {

import FileStreamSource._

Expand Down Expand Up @@ -73,4 +83,45 @@ class FileStreamSourceSuite extends SparkFunSuite {
assert(map.isNewFile(FileEntry("b", 10)))
}

testWithUninterruptibleThread("do not recheck that files exist during getBatch") {
withTempDir { temp =>
spark.conf.set(
s"fs.$scheme.impl",
classOf[ExistsThrowsExceptionFileSystem].getName)
// add the metadata entries as a pre-req
val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
val metadataLog =
new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath)
assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L))))

val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil),
dir.getAbsolutePath, Map.empty)
// this method should throw an exception if `fs.exists` is called during resolveRelation
newSource.getBatch(None, LongOffset(1))
}
}
}

/** Fake FileSystem to test whether the method `fs.exists` is called during
* `DataSource.resolveRelation`.
*/
class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem {
override def getUri: URI = {
URI.create(s"$scheme:///")
}

override def exists(f: Path): Boolean = {
throw new IllegalArgumentException("Exists shouldn't have been called!")
}

/** Simply return an empty file for now. */
override def listStatus(file: Path): Array[FileStatus] = {
val emptyFile = new FileStatus()
emptyFile.setPath(file)
Array(emptyFile)
}
}

object ExistsThrowsExceptionFileSystem {
val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs"
}