-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-1912] fix compress memory issue during reduce #860
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Can one of the admins verify this patch? |
Jenkins, test this please. |
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
@@ -329,8 +329,21 @@ private[spark] class BlockManager( | |||
* never deletes (recent) items. | |||
*/ | |||
def getLocalFromDisk(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = { | |||
diskStore.getValues(blockId, serializer).orElse( | |||
sys.error("Block " + blockId + " not found on disk, though it should be")) | |||
class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mind adding some inline comment on why we are doing this? (basically your pull request description)
@rxin Thanks for your advice! I have added the comment and |
} | ||
|
||
if (diskStore.contains(blockId)) { | ||
Some(new LazyProxyIterator(diskStore.getValues(blockId, serializer).get)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this introduce a race condition because you're calling contains
before getValues
? If the block is removed in that time, you'll have a problem. It would be better to change BlockManager.dataDeserialize to use the lazy iterator.
@mateiz That's a good idea! I have moved the lazy iterator into |
Jenkins, this is ok to test |
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true)) | ||
serializer.newInstance().deserializeStream(stream).asIterator | ||
|
||
def doWork() = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe call this getIterator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it could also be clearer to write it like this:
lazy val iterator = {
val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
serializer.newInstance().deserializeStream(stream).asIterator
}
Thanks for the changes. Made a couple more comments but I think it's almost good to go. |
@mateiz Does |
lazy val shouldn't be worse than what you have now. Anyway maybe it's better to leave it as a function to make it clearer. But just call the function |
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
I don't think there would be any perf issues with this, but might be good to do a perf run to see if there are any perf penalty. There are possibly two perf downsides: 1. contention on lazy val's lock, and 2 the extra branch. However, I think in this case the iterator is consumed by a single thread, so the JVM can get rid of the lock, and branch prediction should work well for the extra branch ... |
When we need to read a compressed block, we will first create a compress stream instance(LZF or Snappy) and use it to wrap that block. Let's say a reducer task need to read 1000 local shuffle blocks, it will first prepare to read that 1000 blocks, which means create 1000 compression stream instance to wrap them. But the initialization of compression instance will allocate some memory and when we have many compression instance at the same time, it is a problem. Actually reducer reads the shuffle blocks one by one, so we can do the compression instance initialization lazily. Author: Wenchen Fan(Cloud) <cloud0fan@gmail.com> Closes apache#860 from cloud-fan/fix-compress and squashes the following commits: 0924a6b [Wenchen Fan(Cloud)] rename 'doWork' into 'getIterator' 07f32c2 [Wenchen Fan(Cloud)] move the LazyProxyIterator to dataDeserialize d80c426 [Wenchen Fan(Cloud)] remove empty lines in short class 2c8adb2 [Wenchen Fan(Cloud)] add inline comment 8ebff77 [Wenchen Fan(Cloud)] fix compress memory issue during reduce
When we need to read a compressed block, we will first create a compress stream instance(LZF or Snappy) and use it to wrap that block. Let's say a reducer task need to read 1000 local shuffle blocks, it will first prepare to read that 1000 blocks, which means create 1000 compression stream instance to wrap them. But the initialization of compression instance will allocate some memory and when we have many compression instance at the same time, it is a problem. Actually reducer reads the shuffle blocks one by one, so we can do the compression instance initialization lazily. Author: Wenchen Fan(Cloud) <cloud0fan@gmail.com> Closes apache#860 from cloud-fan/fix-compress and squashes the following commits: 0924a6b [Wenchen Fan(Cloud)] rename 'doWork' into 'getIterator' 07f32c2 [Wenchen Fan(Cloud)] move the LazyProxyIterator to dataDeserialize d80c426 [Wenchen Fan(Cloud)] remove empty lines in short class 2c8adb2 [Wenchen Fan(Cloud)] add inline comment 8ebff77 [Wenchen Fan(Cloud)] fix compress memory issue during reduce
When we need to read a compressed block, we will first create a compress stream instance(LZF or Snappy) and use it to wrap that block. Let's say a reducer task need to read 1000 local shuffle blocks, it will first prepare to read that 1000 blocks, which means create 1000 compression stream instance to wrap them. But the initialization of compression instance will allocate some memory and when we have many compression instance at the same time, it is a problem. Actually reducer reads the shuffle blocks one by one, so we can do the compression instance initialization lazily. Author: Wenchen Fan(Cloud) <cloud0fan@gmail.com> Closes #860 from cloud-fan/fix-compress and squashes the following commits: 0924a6b [Wenchen Fan(Cloud)] rename 'doWork' into 'getIterator' 07f32c2 [Wenchen Fan(Cloud)] move the LazyProxyIterator to dataDeserialize d80c426 [Wenchen Fan(Cloud)] remove empty lines in short class 2c8adb2 [Wenchen Fan(Cloud)] add inline comment 8ebff77 [Wenchen Fan(Cloud)] fix compress memory issue during reduce
When we need to read a compressed block, we will first create a compress stream instance(LZF or Snappy) and use it to wrap that block. Let's say a reducer task need to read 1000 local shuffle blocks, it will first prepare to read that 1000 blocks, which means create 1000 compression stream instance to wrap them. But the initialization of compression instance will allocate some memory and when we have many compression instance at the same time, it is a problem. Actually reducer reads the shuffle blocks one by one, so we can do the compression instance initialization lazily. Author: Wenchen Fan(Cloud) <cloud0fan@gmail.com> Closes #860 from cloud-fan/fix-compress and squashes the following commits: 0924a6b [Wenchen Fan(Cloud)] rename 'doWork' into 'getIterator' 07f32c2 [Wenchen Fan(Cloud)] move the LazyProxyIterator to dataDeserialize d80c426 [Wenchen Fan(Cloud)] remove empty lines in short class 2c8adb2 [Wenchen Fan(Cloud)] add inline comment 8ebff77 [Wenchen Fan(Cloud)] fix compress memory issue during reduce
When we need to read a compressed block, we will first create a compress stream instance(LZF or Snappy) and use it to wrap that block. Let's say a reducer task need to read 1000 local shuffle blocks, it will first prepare to read that 1000 blocks, which means create 1000 compression stream instance to wrap them. But the initialization of compression instance will allocate some memory and when we have many compression instance at the same time, it is a problem. Actually reducer reads the shuffle blocks one by one, so we can do the compression instance initialization lazily. Author: Wenchen Fan(Cloud) <cloud0fan@gmail.com> Closes apache#860 from cloud-fan/fix-compress and squashes the following commits: 0924a6b [Wenchen Fan(Cloud)] rename 'doWork' into 'getIterator' 07f32c2 [Wenchen Fan(Cloud)] move the LazyProxyIterator to dataDeserialize d80c426 [Wenchen Fan(Cloud)] remove empty lines in short class 2c8adb2 [Wenchen Fan(Cloud)] add inline comment 8ebff77 [Wenchen Fan(Cloud)] fix compress memory issue during reduce
When we need to read a compressed block, we will first create a compress stream instance(LZF or Snappy) and use it to wrap that block.
Let's say a reducer task need to read 1000 local shuffle blocks, it will first prepare to read that 1000 blocks, which means create 1000 compression stream instance to wrap them. But the initialization of compression instance will allocate some memory and when we have many compression instance at the same time, it is a problem.
Actually reducer reads the shuffle blocks one by one, so we can do the compression instance initialization lazily.