Skip to content

Commit a9e9104

Browse files
sarutakrxin
authored andcommitted
[SPARK-3546] InputStream of ManagedBuffer is not closed and causes running out of file descriptor
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #2408 from sarutak/resolve-resource-leak-issue and squashes the following commits: 074781d [Kousuke Saruta] Modified SuffleBlockFetcherIterator 5f63f67 [Kousuke Saruta] Move metrics increment logic and debug logging outside try block b37231a [Kousuke Saruta] Modified FileSegmentManagedBuffer#nioByteBuffer to check null or not before invoking channel.close bf29d4a [Kousuke Saruta] Modified FileSegment to close channel
1 parent 84073eb commit a9e9104

File tree

2 files changed

+11
-3
lines changed

2 files changed

+11
-3
lines changed

core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.network
1919

2020
import java.io.{FileInputStream, RandomAccessFile, File, InputStream}
2121
import java.nio.ByteBuffer
22+
import java.nio.channels.FileChannel
2223
import java.nio.channels.FileChannel.MapMode
2324

2425
import com.google.common.io.ByteStreams
@@ -66,8 +67,15 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt
6667
override def size: Long = length
6768

6869
override def nioByteBuffer(): ByteBuffer = {
69-
val channel = new RandomAccessFile(file, "r").getChannel
70-
channel.map(MapMode.READ_ONLY, offset, length)
70+
var channel: FileChannel = null
71+
try {
72+
channel = new RandomAccessFile(file, "r").getChannel
73+
channel.map(MapMode.READ_ONLY, offset, length)
74+
} finally {
75+
if (channel != null) {
76+
channel.close()
77+
}
78+
}
7179
}
7280

7381
override def inputStream(): InputStream = {

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
2323
import scala.collection.mutable.HashSet
2424
import scala.collection.mutable.Queue
2525

26-
import org.apache.spark.{TaskContext, Logging, SparkException}
26+
import org.apache.spark.{TaskContext, Logging}
2727
import org.apache.spark.network.{ManagedBuffer, BlockFetchingListener, BlockTransferService}
2828
import org.apache.spark.serializer.Serializer
2929
import org.apache.spark.util.Utils

0 commit comments

Comments
 (0)