Skip to content

Commit a82b184

Browse files
committed
add configuration to control the NIO way of copying stream
1 parent e17ada2 commit a82b184

File tree

2 files changed

+8
-6
lines changed

2 files changed

+8
-6
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -272,11 +272,13 @@ private[spark] object Utils extends Logging {
272272
/** Copy all data from an InputStream to an OutputStream */
273273
def copyStream(in: InputStream,
274274
out: OutputStream,
275-
closeStreams: Boolean = false): Long =
275+
closeStreams: Boolean = false,
276+
transferToEnabled: Boolean = true): Long =
276277
{
277278
var count = 0L
278279
try {
279-
if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]) {
280+
if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
281+
&& transferToEnabled) {
280282
// When both streams are File stream, use transferTo to improve copy performance.
281283
val inChannel = in.asInstanceOf[FileInputStream].getChannel()
282284
val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
@@ -292,16 +294,15 @@ private[spark] object Utils extends Logging {
292294
// give user information if not.
293295
// Position will not be increased to the expected length after calling transferTo in
294296
// kernel version 2.6.32, this issue can be seen in
295-
// scalastyle:off
296-
// https://bugs.openjdk.java.net/browse/JDK-7052359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel)
297-
// scalastyle:on
297+
// https://bugs.openjdk.java.net/browse/JDK-7052359
298298
// This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948).
299299
val finalPos = outChannel.position()
300300
assert(finalPos == initialPos + size,
301301
s"""
302302
|Current position $finalPos do not equal to expected position ${initialPos + count}
303303
|after transferTo, please check your kernel version to see if it is 2.6.32,
304304
|this is a kernel bug which will lead to unexpected behavior when using transferTo.
305+
|You can set spark.file.transferTo = false to disable this NIO feature.
305306
""".stripMargin)
306307
} else {
307308
val buf = new Array[Byte](8192)

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ private[spark] class ExternalSorter[K, V, C](
9393
private val conf = SparkEnv.get.conf
9494
private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)
9595
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
96+
private val transferToEnabled = conf.getBoolean("spark.file.transferTo", true)
9697

9798
// Size of object batches when reading/writing from serializers.
9899
//
@@ -708,7 +709,7 @@ private[spark] class ExternalSorter[K, V, C](
708709
out = new FileOutputStream(outputFile, true)
709710
for (i <- 0 until numPartitions) {
710711
in = new FileInputStream(partitionWriters(i).fileSegment().file)
711-
val size = org.apache.spark.util.Utils.copyStream(in, out, false)
712+
val size = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled)
712713
in.close()
713714
in = null
714715
lengths(i) = size

0 commit comments

Comments
 (0)