Skip to content

Commit 54ffcdb

Browse files
committed
fix
1 parent 3946de7 commit 54ffcdb

File tree

2 files changed

+25
-0
lines changed

2 files changed

+25
-0
lines changed

core/src/main/scala/org/apache/spark/input/PortableDataStream.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,18 @@ private[spark] abstract class StreamFileInputFormat[T]
5252
val totalBytes = files.filterNot(_.isDirectory).map(_.getLen + openCostInBytes).sum
5353
val bytesPerCore = totalBytes / defaultParallelism
5454
val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
55+
56+
// For small files we need to ensure the min split size per node & rack <= maxSplitSize
57+
val jobConfig = context.getConfiguration
58+
val minSplitSizePerNode = jobConfig.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L)
59+
val minSplitSizePerRack = jobConfig.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L)
60+
61+
if (maxSplitSize < minSplitSizePerNode) {
62+
super.setMinSplitSizeNode(maxSplitSize)
63+
}
64+
if (maxSplitSize < minSplitSizePerRack) {
65+
super.setMinSplitSizeRack(maxSplitSize)
66+
}
5567
super.setMaxSplitSize(maxSplitSize)
5668
}
5769

core/src/test/scala/org/apache/spark/FileSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,19 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
320320
}
321321
}
322322

323+
test("minimum split size per node and per rack should be less than or equal to maxSplitSize") {
324+
sc = new SparkContext("local", "test")
325+
val testOutput = Array[Byte](1, 2, 3, 4, 5)
326+
val outFile = writeBinaryData(testOutput, 1)
327+
sc.hadoopConfiguration.setLong(
328+
"mapreduce.input.fileinputformat.split.minsize.per.node", 5123456)
329+
sc.hadoopConfiguration.setLong(
330+
"mapreduce.input.fileinputformat.split.minsize.per.rack", 5123456)
331+
332+
val (_, data) = sc.binaryFiles(outFile.getAbsolutePath).collect().head
333+
assert(data.toArray === testOutput)
334+
}
335+
323336
test("fixed record length binary file as byte array") {
324337
sc = new SparkContext("local", "test")
325338
val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)

0 commit comments

Comments
 (0)