Skip to content

Commit abb62f0

Browse files
cloud-fanpwendell
authored andcommitted
[SPARK-1912] fix compress memory issue during reduce
When we need to read a compressed block, we will first create a compress stream instance(LZF or Snappy) and use it to wrap that block. Let's say a reducer task need to read 1000 local shuffle blocks, it will first prepare to read that 1000 blocks, which means create 1000 compression stream instance to wrap them. But the initialization of compression instance will allocate some memory and when we have many compression instance at the same time, it is a problem. Actually reducer reads the shuffle blocks one by one, so we can do the compression instance initialization lazily. Author: Wenchen Fan(Cloud) <cloud0fan@gmail.com> Closes #860 from cloud-fan/fix-compress and squashes the following commits: 0924a6b [Wenchen Fan(Cloud)] rename 'doWork' into 'getIterator' 07f32c2 [Wenchen Fan(Cloud)] move the LazyProxyIterator to dataDeserialize d80c426 [Wenchen Fan(Cloud)] remove empty lines in short class 2c8adb2 [Wenchen Fan(Cloud)] add inline comment 8ebff77 [Wenchen Fan(Cloud)] fix compress memory issue during reduce
1 parent b4b0a54 commit abb62f0

File tree

1 file changed

+20
-2
lines changed

1 file changed

+20
-2
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,8 +1015,26 @@ private[spark] class BlockManager(
10151015
bytes: ByteBuffer,
10161016
serializer: Serializer = defaultSerializer): Iterator[Any] = {
10171017
bytes.rewind()
1018-
val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
1019-
serializer.newInstance().deserializeStream(stream).asIterator
1018+
1019+
def getIterator = {
1020+
val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
1021+
serializer.newInstance().deserializeStream(stream).asIterator
1022+
}
1023+
1024+
if (blockId.isShuffle) {
1025+
// Reducer may need to read many local shuffle blocks and will wrap them into Iterators
1026+
// at the beginning. The wrapping will cost some memory (compression instance
1027+
// initialization, etc.). Reducer read shuffle blocks one by one so we could do the
1028+
// wrapping lazily to save memory.
1029+
class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] {
1030+
lazy val proxy = f
1031+
override def hasNext: Boolean = proxy.hasNext
1032+
override def next(): Any = proxy.next()
1033+
}
1034+
new LazyProxyIterator(getIterator)
1035+
} else {
1036+
getIterator
1037+
}
10201038
}
10211039

10221040
def stop() {

0 commit comments

Comments
 (0)