Skip to content

[SPARK-24610] fix reading small files via wholeTextFiles #21601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from

Conversation

dhruve
Copy link
Contributor

@dhruve dhruve commented Jun 20, 2018

What changes were proposed in this pull request?

The WholeTextFileInputFormat determines the maxSplitSize for the file/s being read using the wholeTextFiles method. While this works well for large files, for smaller files where the maxSplitSize is smaller than the defaults being used with configs like hive-site.xml or explicitly passed in the form of mapreduce.input.fileinputformat.split.minsize.per.node or mapreduce.input.fileinputformat.split.minsize.per.rack , it just throws up an exception.

java.io.IOException: Minimum split size pernode 123456 cannot be larger than maximum split size 9962
at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:200)
at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(WholeTextFileRDD.scala:50)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2096)
at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
... 48 elided
`

This change checks the maxSplitSize against the minSplitSizePerNode and minSplitSizePerRack and set them if `maxSplitSize < minSplitSizePerNode/Rack`

## How was this patch tested?
Test manually setting the conf while launching the job and added unit test.

@SparkQA
Copy link

SparkQA commented Jun 20, 2018

Test build #92143 has finished for PR 21601 at commit 2369e3a.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 20, 2018

Test build #92144 has finished for PR 21601 at commit e2d4e07.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dhruve
Copy link
Contributor Author

dhruve commented Jun 21, 2018

@vanzin Can you review this PR?

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read your changes and in the test I was searching for an check/assert but found none. I understand it is about checking no exception is thrown during the directory content reading but I still missed some asserts and covering more cases, at least:

  • min split size per node < maxSplitSize && min split size per rack < maxSplitSize
  • min split size per node > maxSplitSize && min split size per rack < maxSplitSize
  • min split size per node < maxSplitSize && min split size per rack > maxSplitSize

As I see it is hard to add checks/asserts but what about testing WholeTextFileInputFormat directly?
In your test you could inherit from WholeTextFileInputFormat and override the protected setters for maxSplitSize, minSplitSizeNode, minSplitSizeRack and and store the values in your new test class so asserts and checks can be added.

val minSplitSizePerRack = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L)

if (maxSplitSize < minSplitSizePerNode) {
super.setMinSplitSizeNode(maxSplitSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a point in even checking the configuration? Why not just set these to 0L unconditionally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIU If we set these to 0L unconditionally, every time there is left over data which wasn't combined into a split, would result in its own split because minSplitSizePerNode is 0L.
This shouldn't be an issue for small no. of files. But if we have a large no. of small files which result in a similar situation, we will end up having more splits rather than combining these together to form lesser no. of splits.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if a user specifies them via configs we are ensuring that these don't break the code. If we set them to 0L where a user specifies them, we would end up breaking the code anyways as the way CombineFileInputFormat works is it checks to see if the setting is 0L or not. If it is 0 it ends up picking the value from the config. https://github.com/apache/hadoop/blob/release-2.8.2-RC0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java#L182 So we would have to atleast set the config to avoid hitting the error.

@dhruve
Copy link
Contributor Author

dhruve commented Jul 2, 2018

@attilapiros I will modify the test to add a check/assert which makes it easy to follow and validate what we are trying to achieve in the test. For the rest of the cases, since these are hadoop related configs and not directly related to spark, I didn't add additional test cases as these are more related to the CombinedFileInputFormat rather than WholeTextFileInputFormat.

@SparkQA
Copy link

SparkQA commented Jul 3, 2018

Test build #92545 has finished for PR 21601 at commit b351406.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 3, 2018

Test build #92566 has finished for PR 21601 at commit 15356df.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val conf = new SparkConf()
sc = new SparkContext("local", "test", conf)

sc.hadoopConfiguration.setLong("mapreduce.input.fileinputformat.split.minsize.per.node", 123456)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to add comment here about 123456 value - ie it being larger than maxSplitSize
Also can we move this down into the test itself

@SparkQA
Copy link

SparkQA commented Jul 9, 2018

Test build #92772 has finished for PR 21601 at commit f1c4160.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 10, 2018

Test build #92773 has finished for PR 21601 at commit bcb2991.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

+1

@tgravescs
Copy link
Contributor

merged to master

@asfgit asfgit closed this in 1055c94 Jul 12, 2018
asfgit pushed a commit that referenced this pull request Oct 22, 2018
## What changes were proposed in this pull request?

This is a follow up of #21601, `StreamFileInputFormat` and `WholeTextFileInputFormat` have the same problem.

`Minimum split size pernode 5123456 cannot be larger than maximum split size 4194304
java.io.IOException: Minimum split size pernode 5123456 cannot be larger than maximum split size 4194304
        at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java: 201)
	at org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:52)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:254)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:252)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2138)`

## How was this patch tested?
Added a unit test

Closes #22725 from 10110346/maxSplitSize_node_rack.

Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Thomas Graves <tgraves@apache.org>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

This is a follow up of apache#21601, `StreamFileInputFormat` and `WholeTextFileInputFormat` have the same problem.

`Minimum split size pernode 5123456 cannot be larger than maximum split size 4194304
java.io.IOException: Minimum split size pernode 5123456 cannot be larger than maximum split size 4194304
        at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java: 201)
	at org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:52)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:254)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:252)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2138)`

## How was this patch tested?
Added a unit test

Closes apache#22725 from 10110346/maxSplitSize_node_rack.

Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Thomas Graves <tgraves@apache.org>
dhruve pushed a commit to dhruve/spark that referenced this pull request Oct 4, 2019
## What changes were proposed in this pull request?

This is a follow up of apache#21601, `StreamFileInputFormat` and `WholeTextFileInputFormat` have the same problem.

`Minimum split size pernode 5123456 cannot be larger than maximum split size 4194304
java.io.IOException: Minimum split size pernode 5123456 cannot be larger than maximum split size 4194304
        at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java: 201)
	at org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:52)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:254)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:252)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2138)`

## How was this patch tested?
Added a unit test

Closes apache#22725 from 10110346/maxSplitSize_node_rack.

Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Thomas Graves <tgraves@apache.org>
dongjoon-hyun pushed a commit that referenced this pull request Oct 4, 2019
### What changes were proposed in this pull request?
This is a clean cherry pick of  #22725 from master to 2.4

This is a follow up of #21601, `StreamFileInputFormat` and `WholeTextFileInputFormat` have the same problem.

`Minimum split size pernode 5123456 cannot be larger than maximum split size 4194304
java.io.IOException: Minimum split size pernode 5123456 cannot be larger than maximum split size 4194304
        at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java: 201)
	at org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:52)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:254)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:252)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2138)`

### Why are the changes needed?
This is an existing bug which was fixed in master, but not back ported to 2.4.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
The original patch added a unit test.

Ran the unit test that was added in the original patch and manually verified the changes by creating a multiline csv and loading it in spark shell.

Closes #26026 from dhruve/fix/SPARK-25753/2.4.

Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants