Skip to content

[SPARK-26161][SQL] Ignore empty files in load #23130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from

Conversation

MaxGekk
Copy link
Member

@MaxGekk MaxGekk commented Nov 24, 2018

What changes were proposed in this pull request?

In the PR, I propose filtering out all empty files inside of FileSourceScanExec and exclude them from file splits. It should reduce overhead of opening and reading files without any data, and as consequence datasources will not produce empty partitions for such files.

How was this patch tested?

Added a test which creates an empty and non-empty files. If empty files are ignored in load, Text datasource in the wholetext mode must create only one partition for non-empty file.

@MaxGekk
Copy link
Member Author

MaxGekk commented Nov 24, 2018

@cloud-fan @HyukjinKwon Please, take a look at the PR.

@SparkQA
Copy link

SparkQA commented Nov 24, 2018

Test build #99227 has finished for PR 23130 at commit b200a50.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 25, 2018

Test build #99239 has finished for PR 23130 at commit e7871f3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

The code change LGTM. There is a mistake in PR description: we updated FileSourceScanExec not DataSourceScanExec. Let's also mention that this fixed a behavior change introduced by #22938 mistakenly.

@@ -388,7 +388,7 @@ case class FileSourceScanExec(
logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
val filesGroupedToBuckets =
selectedPartitions.flatMap { p =>
p.files.map { f =>
p.files.filter(_.getLen > 0).map { f =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do the filtering inside the map?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test case for this line?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean changing filter...map... to flatMap? I don't have a strong preference about it.

The updated test cases and the new test case are for this change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally prefer filter + map as it's shorter and clearer. I don't know if one is faster; two transformations vs having to return Some/None. For a Dataset operation I'd favor one operation, but this is just local Scala code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's non-critical path in terms of performance. Should be okay.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This createBucketedReadRDD is for the bucket table, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, and the same change is also in createNonBucketedReadRDD

withTempDir { dir =>
val path = dir.getCanonicalPath
Files.write(Paths.get(path, "empty"), Array.empty[Byte])
Files.write(Paths.get(path, "notEmpty"), "a".getBytes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit for consistency: .getBytes(StandardCharsets.UTF_8)

Files.write(Paths.get(path, "notEmpty"), "a".getBytes)
val readback = spark.read.option("wholetext", true).text(path)

assert(readback.rdd.getNumPartitions == 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need 1 === ... to get the right assert message? it's tiny.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems expected value should be on right. I changed the order and got the following:

assert(123 === readback.rdd.getNumPartitions)
123 did not equal 1
ScalaTestFailureLocation: org.apache.spark.sql.sources.SaveLoadSuite at (SaveLoadSuite.scala:155)
Expected :1
Actual   :123

Current assert triggers correct message:

assert(readback.rdd.getNumPartitions == 123)
1 did not equal 123
ScalaTestFailureLocation: org.apache.spark.sql.sources.SaveLoadSuite at (SaveLoadSuite.scala:155)
Expected :123
Actual   :1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just referring to === and the order of args. I'm sure the test was right as-is in what it asserts.

@SparkQA
Copy link

SparkQA commented Nov 27, 2018

Test build #99335 has finished for PR 23130 at commit 7057f8b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8))
val readback = spark.read.option("wholetext", true).text(path)

assert(readback.rdd.getNumPartitions === 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this test fail without your change? IIUC one partition can read multiple files. Is JSON the only data source that may return a row for empty file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this test fail without your change?

Yes, it does due to the wholetext.

Is JSON the only data source that may return a row for empty file?

We depend on underlying parser here. I will check CSV and Text.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean wholetext mode will force to create one partition per file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, wholetext makes files not splittable:

super.isSplitable(sparkSession, options, path) && !textOptions.wholeText

This can guarantee ( in text datasources at least) one file -> one partition.

IIUC one partition can read multiple files.

Do you mean this code?

if (currentSize + file.length > maxSplitBytes) {
closePartition()
}
// Add the given file to the current partition.
currentSize += file.length + openCostInBytes
currentFiles += file

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for pointing it out, I think we are good here.

@cloud-fan
Copy link
Contributor

I think this change makes sense, at least it's good for performance. My only concern is, shall we ask all the parsers to return Nil for empty files? AFAIK JSON doesn't follow it.

@SparkQA
Copy link

SparkQA commented Nov 29, 2018

Test build #99465 has finished for PR 23130 at commit 1f58cc1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan are you OK with this?

@cloud-fan
Copy link
Contributor

We don't need to block it, but @MaxGekk if you have time, it would great to answer #23130 (comment)

thanks, merging to master!

@asfgit asfgit closed this in 3e46e3c Dec 2, 2018
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

In the PR, I propose filtering out all empty files inside of `FileSourceScanExec` and exclude them from file splits. It should reduce overhead of opening and reading files without any data, and as consequence datasources will not produce empty partitions for such files.

## How was this patch tested?

Added a test which creates an empty and non-empty files. If empty files are ignored in load, Text datasource in the `wholetext` mode must create only one partition for non-empty file.

Closes apache#23130 from MaxGekk/ignore-empty-files.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@MaxGekk MaxGekk deleted the ignore-empty-files branch August 17, 2019 13:33
lwwmanning pushed a commit to palantir/spark that referenced this pull request Jan 9, 2020
… on listing files

In apache#23130, all empty files are excluded from target file splits in `FileSourceScanExec`.
In File source V2, we should keep the same behavior.

This PR suggests to filter out empty files on listing files in `PartitioningAwareFileIndex` so that the upper level doesn't need to handle them.

Unit test

Closes apache#24227 from gengliangwang/ignoreEmptyFile.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants