Skip to content

Commit 074781d

Browse files
committed
Modified SuffleBlockFetcherIterator
1 parent 5f63f67 commit 074781d

File tree

1 file changed

+4
-13
lines changed

1 file changed

+4
-13
lines changed

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.storage
1919

20-
import java.io.InputStream
2120
import java.util.concurrent.LinkedBlockingQueue
2221

2322
import scala.collection.mutable.ArrayBuffer
@@ -112,18 +111,10 @@ final class ShuffleBlockFetcherIterator(
112111
blockTransferService.fetchBlocks(req.address.host, req.address.port, blockIds,
113112
new BlockFetchingListener {
114113
override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
115-
var is: InputStream = null
116-
try {
117-
is = data.inputStream()
118-
results.put(new FetchResult(BlockId(blockId), sizeMap(blockId),
119-
() => serializer.newInstance().deserializeStream(
120-
blockManager.wrapForCompression(BlockId(blockId), is)).asIterator
121-
))
122-
} finally {
123-
if (is != null) {
124-
is.close()
125-
}
126-
}
114+
results.put(new FetchResult(BlockId(blockId), sizeMap(blockId),
115+
() => serializer.newInstance().deserializeStream(
116+
blockManager.wrapForCompression(BlockId(blockId), data.inputStream())).asIterator
117+
))
127118
shuffleMetrics.remoteBytesRead += data.size
128119
shuffleMetrics.remoteBlocksFetched += 1
129120
logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))

0 commit comments

Comments
 (0)