-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-24610] fix reading small files via wholeTextFiles #21601
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #92143 has finished for PR 21601 at commit
|
Test build #92144 has finished for PR 21601 at commit
|
@vanzin Can you review this PR? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I read your changes and in the test I was searching for an check/assert but found none. I understand it is about checking no exception is thrown during the directory content reading but I still missed some asserts and covering more cases, at least:
- min split size per node < maxSplitSize && min split size per rack < maxSplitSize
- min split size per node > maxSplitSize && min split size per rack < maxSplitSize
- min split size per node < maxSplitSize && min split size per rack > maxSplitSize
As I see it is hard to add checks/asserts but what about testing WholeTextFileInputFormat directly?
In your test you could inherit from WholeTextFileInputFormat and override the protected setters for maxSplitSize, minSplitSizeNode, minSplitSizeRack and and store the values in your new test class so asserts and checks can be added.
val minSplitSizePerRack = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L) | ||
|
||
if (maxSplitSize < minSplitSizePerNode) { | ||
super.setMinSplitSizeNode(maxSplitSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a point in even checking the configuration? Why not just set these to 0L
unconditionally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIU If we set these to 0L
unconditionally, every time there is left over data which wasn't combined into a split, would result in its own split because minSplitSizePerNode is 0L
.
This shouldn't be an issue for small no. of files. But if we have a large no. of small files which result in a similar situation, we will end up having more splits rather than combining these together to form lesser no. of splits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also if a user specifies them via configs we are ensuring that these don't break the code. If we set them to 0L
where a user specifies them, we would end up breaking the code anyways as the way CombineFileInputFormat
works is it checks to see if the setting is 0L
or not. If it is 0 it ends up picking the value from the config. https://github.com/apache/hadoop/blob/release-2.8.2-RC0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java#L182 So we would have to atleast set the config to avoid hitting the error.
@attilapiros I will modify the test to add a check/assert which makes it easy to follow and validate what we are trying to achieve in the test. For the rest of the cases, since these are hadoop related configs and not directly related to spark, I didn't add additional test cases as these are more related to the |
Test build #92545 has finished for PR 21601 at commit
|
Test build #92566 has finished for PR 21601 at commit
|
val conf = new SparkConf() | ||
sc = new SparkContext("local", "test", conf) | ||
|
||
sc.hadoopConfiguration.setLong("mapreduce.input.fileinputformat.split.minsize.per.node", 123456) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be nice to add comment here about 123456 value - ie it being larger than maxSplitSize
Also can we move this down into the test itself
Test build #92772 has finished for PR 21601 at commit
|
Test build #92773 has finished for PR 21601 at commit
|
+1 |
merged to master |
## What changes were proposed in this pull request? This is a follow up of #21601, `StreamFileInputFormat` and `WholeTextFileInputFormat` have the same problem. `Minimum split size pernode 5123456 cannot be larger than maximum split size 4194304 java.io.IOException: Minimum split size pernode 5123456 cannot be larger than maximum split size 4194304 at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java: 201) at org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:52) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:254) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:252) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2138)` ## How was this patch tested? Added a unit test Closes #22725 from 10110346/maxSplitSize_node_rack. Authored-by: liuxian <liu.xian3@zte.com.cn> Signed-off-by: Thomas Graves <tgraves@apache.org>
## What changes were proposed in this pull request? This is a follow up of apache#21601, `StreamFileInputFormat` and `WholeTextFileInputFormat` have the same problem. `Minimum split size pernode 5123456 cannot be larger than maximum split size 4194304 java.io.IOException: Minimum split size pernode 5123456 cannot be larger than maximum split size 4194304 at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java: 201) at org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:52) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:254) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:252) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2138)` ## How was this patch tested? Added a unit test Closes apache#22725 from 10110346/maxSplitSize_node_rack. Authored-by: liuxian <liu.xian3@zte.com.cn> Signed-off-by: Thomas Graves <tgraves@apache.org>
## What changes were proposed in this pull request? This is a follow up of apache#21601, `StreamFileInputFormat` and `WholeTextFileInputFormat` have the same problem. `Minimum split size pernode 5123456 cannot be larger than maximum split size 4194304 java.io.IOException: Minimum split size pernode 5123456 cannot be larger than maximum split size 4194304 at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java: 201) at org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:52) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:254) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:252) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2138)` ## How was this patch tested? Added a unit test Closes apache#22725 from 10110346/maxSplitSize_node_rack. Authored-by: liuxian <liu.xian3@zte.com.cn> Signed-off-by: Thomas Graves <tgraves@apache.org>
### What changes were proposed in this pull request? This is a clean cherry pick of #22725 from master to 2.4 This is a follow up of #21601, `StreamFileInputFormat` and `WholeTextFileInputFormat` have the same problem. `Minimum split size pernode 5123456 cannot be larger than maximum split size 4194304 java.io.IOException: Minimum split size pernode 5123456 cannot be larger than maximum split size 4194304 at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java: 201) at org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:52) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:254) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:252) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2138)` ### Why are the changes needed? This is an existing bug which was fixed in master, but not back ported to 2.4. ### Does this PR introduce any user-facing change? No ### How was this patch tested? The original patch added a unit test. Ran the unit test that was added in the original patch and manually verified the changes by creating a multiline csv and loading it in spark shell. Closes #26026 from dhruve/fix/SPARK-25753/2.4. Authored-by: liuxian <liu.xian3@zte.com.cn> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
What changes were proposed in this pull request?
The
WholeTextFileInputFormat
determines themaxSplitSize
for the file/s being read using thewholeTextFiles
method. While this works well for large files, for smaller files where the maxSplitSize is smaller than the defaults being used with configs like hive-site.xml or explicitly passed in the form ofmapreduce.input.fileinputformat.split.minsize.per.node
ormapreduce.input.fileinputformat.split.minsize.per.rack
, it just throws up an exception.