Skip to content

Commit 81a305d

Browse files
10110346tgravescs
authored andcommitted
[SPARK-25753][CORE] fix reading small files via BinaryFileRDD
## What changes were proposed in this pull request? This is a follow up of #21601, `StreamFileInputFormat` and `WholeTextFileInputFormat` have the same problem. `Minimum split size pernode 5123456 cannot be larger than maximum split size 4194304 java.io.IOException: Minimum split size pernode 5123456 cannot be larger than maximum split size 4194304 at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java: 201) at org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:52) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:254) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:252) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2138)` ## How was this patch tested? Added a unit test Closes #22725 from 10110346/maxSplitSize_node_rack. Authored-by: liuxian <liu.xian3@zte.com.cn> Signed-off-by: Thomas Graves <tgraves@apache.org>
1 parent fc64e83 commit 81a305d

File tree

2 files changed

+25
-0
lines changed

2 files changed

+25
-0
lines changed

core/src/main/scala/org/apache/spark/input/PortableDataStream.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,18 @@ private[spark] abstract class StreamFileInputFormat[T]
5252
val totalBytes = files.filterNot(_.isDirectory).map(_.getLen + openCostInBytes).sum
5353
val bytesPerCore = totalBytes / defaultParallelism
5454
val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
55+
56+
// For small files we need to ensure the min split size per node & rack <= maxSplitSize
57+
val jobConfig = context.getConfiguration
58+
val minSplitSizePerNode = jobConfig.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L)
59+
val minSplitSizePerRack = jobConfig.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L)
60+
61+
if (maxSplitSize < minSplitSizePerNode) {
62+
super.setMinSplitSizeNode(maxSplitSize)
63+
}
64+
if (maxSplitSize < minSplitSizePerRack) {
65+
super.setMinSplitSizeRack(maxSplitSize)
66+
}
5567
super.setMaxSplitSize(maxSplitSize)
5668
}
5769

core/src/test/scala/org/apache/spark/FileSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,19 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
320320
}
321321
}
322322

323+
test("minimum split size per node and per rack should be less than or equal to maxSplitSize") {
324+
sc = new SparkContext("local", "test")
325+
val testOutput = Array[Byte](1, 2, 3, 4, 5)
326+
val outFile = writeBinaryData(testOutput, 1)
327+
sc.hadoopConfiguration.setLong(
328+
"mapreduce.input.fileinputformat.split.minsize.per.node", 5123456)
329+
sc.hadoopConfiguration.setLong(
330+
"mapreduce.input.fileinputformat.split.minsize.per.rack", 5123456)
331+
332+
val (_, data) = sc.binaryFiles(outFile.getAbsolutePath).collect().head
333+
assert(data.toArray === testOutput)
334+
}
335+
323336
test("fixed record length binary file as byte array") {
324337
sc = new SparkContext("local", "test")
325338
val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)

0 commit comments

Comments
 (0)