Skip to content

Commit 3d6e754

Browse files
committed
Merge pull request apache#503 from pwendell/master
Fix bug on read-side of external sort when using Snappy. This case wasn't handled correctly and this patch fixes it.
2 parents ff44732 + 0213b40 commit 3d6e754

File tree

1 file changed

+9
-1
lines changed

1 file changed

+9
-1
lines changed

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,15 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
358358
private class DiskMapIterator(file: File, blockId: BlockId) extends Iterator[(K, C)] {
359359
val fileStream = new FileInputStream(file)
360360
val bufferedStream = new FastBufferedInputStream(fileStream, fileBufferSize)
361-
val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
361+
362+
val shouldCompress = blockManager.shouldCompress(blockId)
363+
val compressionCodec = new LZFCompressionCodec(sparkConf)
364+
val compressedStream =
365+
if (shouldCompress) {
366+
compressionCodec.compressedInputStream(bufferedStream)
367+
} else {
368+
bufferedStream
369+
}
362370
var deserializeStream = ser.deserializeStream(compressedStream)
363371
var objectsRead = 0
364372

0 commit comments

Comments
 (0)