Skip to content

Commit 8ebff77

Browse files
committed
fix compress memory issue during reduce
1 parent b2bdd0e commit 8ebff77

File tree

1 file changed

+15
-2
lines changed

1 file changed

+15
-2
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,8 +329,21 @@ private[spark] class BlockManager(
329329
* never deletes (recent) items.
330330
*/
331331
def getLocalFromDisk(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
332-
diskStore.getValues(blockId, serializer).orElse(
333-
sys.error("Block " + blockId + " not found on disk, though it should be"))
332+
class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] {
333+
334+
lazy val proxy = f
335+
336+
def hasNext: Boolean = proxy.hasNext
337+
338+
def next(): Any = proxy.next()
339+
}
340+
341+
if (diskStore.contains(blockId)) {
342+
Some(new LazyProxyIterator(diskStore.getValues(blockId, serializer).get))
343+
} else {
344+
sys.error("Block " + blockId + " not found on disk, though it should be")
345+
None
346+
}
334347
}
335348

336349
/**

0 commit comments

Comments
 (0)