-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-26161][SQL] Ignore empty files in load #23130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@cloud-fan @HyukjinKwon Please, take a look at the PR. |
Test build #99227 has finished for PR 23130 at commit
|
sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
Outdated
Show resolved
Hide resolved
Test build #99239 has finished for PR 23130 at commit
|
The code change LGTM. There is a mistake in PR description: we updated |
@@ -388,7 +388,7 @@ case class FileSourceScanExec( | |||
logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") | |||
val filesGroupedToBuckets = | |||
selectedPartitions.flatMap { p => | |||
p.files.map { f => | |||
p.files.filter(_.getLen > 0).map { f => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do the filtering inside the map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a test case for this line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean changing filter...map...
to flatMap
? I don't have a strong preference about it.
The updated test cases and the new test case are for this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally prefer filter + map as it's shorter and clearer. I don't know if one is faster; two transformations vs having to return Some/None. For a Dataset operation I'd favor one operation, but this is just local Scala code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's non-critical path in terms of performance. Should be okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This createBucketedReadRDD
is for the bucket table, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, and the same change is also in createNonBucketedReadRDD
withTempDir { dir => | ||
val path = dir.getCanonicalPath | ||
Files.write(Paths.get(path, "empty"), Array.empty[Byte]) | ||
Files.write(Paths.get(path, "notEmpty"), "a".getBytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit for consistency: .getBytes(StandardCharsets.UTF_8)
Files.write(Paths.get(path, "notEmpty"), "a".getBytes) | ||
val readback = spark.read.option("wholetext", true).text(path) | ||
|
||
assert(readback.rdd.getNumPartitions == 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need 1 === ...
to get the right assert message? it's tiny.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems expected value should be on right. I changed the order and got the following:
assert(123 === readback.rdd.getNumPartitions)
123 did not equal 1
ScalaTestFailureLocation: org.apache.spark.sql.sources.SaveLoadSuite at (SaveLoadSuite.scala:155)
Expected :1
Actual :123
Current assert triggers correct message:
assert(readback.rdd.getNumPartitions == 123)
1 did not equal 123
ScalaTestFailureLocation: org.apache.spark.sql.sources.SaveLoadSuite at (SaveLoadSuite.scala:155)
Expected :123
Actual :1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am just referring to === and the order of args. I'm sure the test was right as-is in what it asserts.
Test build #99335 has finished for PR 23130 at commit
|
Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8)) | ||
val readback = spark.read.option("wholetext", true).text(path) | ||
|
||
assert(readback.rdd.getNumPartitions === 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this test fail without your change? IIUC one partition can read multiple files. Is JSON the only data source that may return a row for empty file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this test fail without your change?
Yes, it does due to the wholetext
.
Is JSON the only data source that may return a row for empty file?
We depend on underlying parser here. I will check CSV and Text.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean wholetext
mode will force to create one partition per file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so, wholetext
makes files not splittable:
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
Line 57 in 46110a5
super.isSplitable(sparkSession, options, path) && !textOptions.wholeText |
This can guarantee ( in text datasources at least) one file -> one partition.
IIUC one partition can read multiple files.
Do you mean this code?
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
Lines 459 to 464 in 8c68718
if (currentSize + file.length > maxSplitBytes) { | |
closePartition() | |
} | |
// Add the given file to the current partition. | |
currentSize += file.length + openCostInBytes | |
currentFiles += file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for pointing it out, I think we are good here.
I think this change makes sense, at least it's good for performance. My only concern is, shall we ask all the parsers to return Nil for empty files? AFAIK JSON doesn't follow it. |
Test build #99465 has finished for PR 23130 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan are you OK with this?
We don't need to block it, but @MaxGekk if you have time, it would great to answer #23130 (comment) thanks, merging to master! |
## What changes were proposed in this pull request? In the PR, I propose filtering out all empty files inside of `FileSourceScanExec` and exclude them from file splits. It should reduce overhead of opening and reading files without any data, and as consequence datasources will not produce empty partitions for such files. ## How was this patch tested? Added a test which creates an empty and non-empty files. If empty files are ignored in load, Text datasource in the `wholetext` mode must create only one partition for non-empty file. Closes apache#23130 from MaxGekk/ignore-empty-files. Authored-by: Maxim Gekk <max.gekk@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… on listing files In apache#23130, all empty files are excluded from target file splits in `FileSourceScanExec`. In File source V2, we should keep the same behavior. This PR suggests to filter out empty files on listing files in `PartitioningAwareFileIndex` so that the upper level doesn't need to handle them. Unit test Closes apache#24227 from gengliangwang/ignoreEmptyFile. Authored-by: Gengliang Wang <gengliang.wang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
In the PR, I propose filtering out all empty files inside of
FileSourceScanExec
and exclude them from file splits. It should reduce overhead of opening and reading files without any data, and as consequence datasources will not produce empty partitions for such files.How was this patch tested?
Added a test which creates an empty and non-empty files. If empty files are ignored in load, Text datasource in the
wholetext
mode must create only one partition for non-empty file.