@@ -329,23 +329,8 @@ private[spark] class BlockManager(
329
329
* never deletes (recent) items.
330
330
*/
331
331
def getLocalFromDisk (blockId : BlockId , serializer : Serializer ): Option [Iterator [Any ]] = {
332
-
333
- // Reducer may need to read many local shuffle blocks and will wrap them into Iterators
334
- // at the beginning. The wrapping will cost some memory(compression instance
335
- // initialization, etc.). Reducer read shuffle blocks one by one so we could do the
336
- // wrapping lazily to save memory.
337
- class LazyProxyIterator (f : => Iterator [Any ]) extends Iterator [Any ] {
338
- lazy val proxy = f
339
- override def hasNext : Boolean = proxy.hasNext
340
- override def next (): Any = proxy.next()
341
- }
342
-
343
- if (diskStore.contains(blockId)) {
344
- Some (new LazyProxyIterator (diskStore.getValues(blockId, serializer).get))
345
- } else {
346
- sys.error(" Block " + blockId + " not found on disk, though it should be" )
347
- None
348
- }
332
+ diskStore.getValues(blockId, serializer).orElse(
333
+ sys.error(" Block " + blockId + " not found on disk, though it should be" ))
349
334
}
350
335
351
336
/**
@@ -1030,8 +1015,26 @@ private[spark] class BlockManager(
1030
1015
bytes : ByteBuffer ,
1031
1016
serializer : Serializer = defaultSerializer): Iterator [Any ] = {
1032
1017
bytes.rewind()
1033
- val stream = wrapForCompression(blockId, new ByteBufferInputStream (bytes, true ))
1034
- serializer.newInstance().deserializeStream(stream).asIterator
1018
+
1019
+ def doWork () = {
1020
+ val stream = wrapForCompression(blockId, new ByteBufferInputStream (bytes, true ))
1021
+ serializer.newInstance().deserializeStream(stream).asIterator
1022
+ }
1023
+
1024
+ if (blockId.isShuffle) {
1025
+ // Reducer may need to read many local shuffle blocks and will wrap them into Iterators
1026
+ // at the beginning. The wrapping will cost some memory(compression instance
1027
+ // initialization, etc.). Reducer read shuffle blocks one by one so we could do the
1028
+ // wrapping lazily to save memory.
1029
+ class LazyProxyIterator (f : => Iterator [Any ]) extends Iterator [Any ] {
1030
+ lazy val proxy = f
1031
+ override def hasNext : Boolean = proxy.hasNext
1032
+ override def next (): Any = proxy.next()
1033
+ }
1034
+ new LazyProxyIterator (doWork())
1035
+ } else {
1036
+ doWork()
1037
+ }
1035
1038
}
1036
1039
1037
1040
def stop () {
0 commit comments