Skip to content

Commit e17ada2

Browse files
committed
Fix kernel 2.6.32 bug led unexpected behavior of transferTo
1 parent 044583a commit e17ada2

File tree

2 files changed

+19
-2
lines changed

2 files changed

+19
-2
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,12 +280,29 @@ private[spark] object Utils extends Logging {
280280
// When both streams are File stream, use transferTo to improve copy performance.
281281
val inChannel = in.asInstanceOf[FileInputStream].getChannel()
282282
val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
283+
val initialPos = outChannel.position()
283284
val size = inChannel.size()
284285

285286
// In case transferTo method transferred less data than we have required.
286287
while (count < size) {
287288
count += inChannel.transferTo(count, size - count, outChannel)
288289
}
290+
291+
// Check the position after transferTo loop to see if it is in the right position and
292+
// give user information if not.
293+
// Position will not be increased to the expected length after calling transferTo in
294+
// kernel version 2.6.32, this issue can be seen in
295+
// scalastyle:off
296+
// https://bugs.openjdk.java.net/browse/JDK-7052359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel)
297+
// scalastyle:on
298+
// This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948).
299+
val finalPos = outChannel.position()
300+
assert(finalPos == initialPos + size,
301+
s"""
302+
|Current position $finalPos do not equal to expected position ${initialPos + count}
303+
|after transferTo, please check your kernel version to see if it is 2.6.32,
304+
|this is a kernel bug which will lead to unexpected behavior when using transferTo.
305+
""".stripMargin)
289306
} else {
290307
val buf = new Array[Byte](8192)
291308
var n = 0
@@ -727,7 +744,7 @@ private[spark] object Utils extends Logging {
727744

728745
/**
729746
* Determines if a directory contains any files newer than cutoff seconds.
730-
*
747+
*
731748
* @param dir must be the path to a directory, or IllegalArgumentException is thrown
732749
* @param cutoff measured in seconds. Returns true if there are any files or directories in the
733750
* given directory whose last modified time is later than this many seconds ago

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -705,7 +705,7 @@ private[spark] class ExternalSorter[K, V, C](
705705
var out: FileOutputStream = null
706706
var in: FileInputStream = null
707707
try {
708-
out = new FileOutputStream(outputFile)
708+
out = new FileOutputStream(outputFile, true)
709709
for (i <- 0 until numPartitions) {
710710
in = new FileInputStream(partitionWriters(i).fileSegment().file)
711711
val size = org.apache.spark.util.Utils.copyStream(in, out, false)

0 commit comments

Comments
 (0)