Skip to content

[SPARK-1912] fix compress memory issue during reduce #860

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

cloud-fan
Copy link
Contributor

When we need to read a compressed block, we will first create a compress stream instance(LZF or Snappy) and use it to wrap that block.
Let's say a reducer task need to read 1000 local shuffle blocks, it will first prepare to read that 1000 blocks, which means create 1000 compression stream instance to wrap them. But the initialization of compression instance will allocate some memory and when we have many compression instance at the same time, it is a problem.
Actually reducer reads the shuffle blocks one by one, so we can do the compression instance initialization lazily.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@rxin
Copy link
Contributor

rxin commented May 25, 2014

Jenkins, test this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15181/

@@ -329,8 +329,21 @@ private[spark] class BlockManager(
* never deletes (recent) items.
*/
def getLocalFromDisk(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
diskStore.getValues(blockId, serializer).orElse(
sys.error("Block " + blockId + " not found on disk, though it should be"))
class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind adding some inline comment on why we are doing this? (basically your pull request description)

@cloud-fan
Copy link
Contributor Author

@rxin Thanks for your advice! I have added the comment and override, and please take a look to see if I missed something. Thanks!

}

if (diskStore.contains(blockId)) {
Some(new LazyProxyIterator(diskStore.getValues(blockId, serializer).get))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this introduce a race condition because you're calling contains before getValues? If the block is removed in that time, you'll have a problem. It would be better to change BlockManager.dataDeserialize to use the lazy iterator.

@cloud-fan
Copy link
Contributor Author

@mateiz That's a good idea! I have moved the lazy iterator into BlockManager.dataDeserialize. Thanks for your comments!

@mateiz
Copy link
Contributor

mateiz commented May 29, 2014

Jenkins, this is ok to test

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15291/

val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
serializer.newInstance().deserializeStream(stream).asIterator

def doWork() = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call this getIterator

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it could also be clearer to write it like this:

lazy val iterator = {
  val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
  serializer.newInstance().deserializeStream(stream).asIterator
}

@mateiz
Copy link
Contributor

mateiz commented Jun 2, 2014

Thanks for the changes. Made a couple more comments but I think it's almost good to go.

@cloud-fan
Copy link
Contributor Author

@mateiz Does lazy val has performance overhead? I agree lazy val can make the code clearer here, but dataDeserialize can be called many times if there are lots of shuffle blocks. I'm not a scala expert, please correct me if I am wrong.

@mateiz
Copy link
Contributor

mateiz commented Jun 3, 2014

lazy val shouldn't be worse than what you have now. Anyway maybe it's better to leave it as a function to make it clearer. But just call the function getIterator.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15375/

@asfgit asfgit closed this in 45e9bc8 Jun 3, 2014
@mateiz
Copy link
Contributor

mateiz commented Jun 3, 2014

Thanks Wenchen, I've merged this for 1.1. We may also want to merge it for 1.0.1 but I'll wait to see what others thing since we might want to do more performance testing. @pwendell @rxin what are your thoughts?

@rxin
Copy link
Contributor

rxin commented Jun 3, 2014

I don't think there would be any perf issues with this, but might be good to do a perf run to see if there are any perf penalty.

There are possibly two perf downsides: 1. contention on lazy val's lock, and 2 the extra branch. However, I think in this case the iterator is consumed by a single thread, so the JVM can get rid of the lock, and branch prediction should work well for the extra branch ...

pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
When we need to read a compressed block, we will first create a compress stream instance(LZF or Snappy) and use it to wrap that block.
Let's say a reducer task need to read 1000 local shuffle blocks, it will first prepare to read that 1000 blocks, which means create 1000 compression stream instance to wrap them. But the initialization of compression instance will allocate some memory and when we have many compression instance at the same time, it is a problem.
Actually reducer reads the shuffle blocks one by one, so we can do the compression instance initialization lazily.

Author: Wenchen Fan(Cloud) <cloud0fan@gmail.com>

Closes apache#860 from cloud-fan/fix-compress and squashes the following commits:

0924a6b [Wenchen Fan(Cloud)] rename 'doWork' into 'getIterator'
07f32c2 [Wenchen Fan(Cloud)] move the LazyProxyIterator to dataDeserialize
d80c426 [Wenchen Fan(Cloud)] remove empty lines in short class
2c8adb2 [Wenchen Fan(Cloud)] add inline comment
8ebff77 [Wenchen Fan(Cloud)] fix compress memory issue during reduce
pwendell pushed a commit to pwendell/spark that referenced this pull request Jun 25, 2014
When we need to read a compressed block, we will first create a compress stream instance(LZF or Snappy) and use it to wrap that block.
Let's say a reducer task need to read 1000 local shuffle blocks, it will first prepare to read that 1000 blocks, which means create 1000 compression stream instance to wrap them. But the initialization of compression instance will allocate some memory and when we have many compression instance at the same time, it is a problem.
Actually reducer reads the shuffle blocks one by one, so we can do the compression instance initialization lazily.

Author: Wenchen Fan(Cloud) <cloud0fan@gmail.com>

Closes apache#860 from cloud-fan/fix-compress and squashes the following commits:

0924a6b [Wenchen Fan(Cloud)] rename 'doWork' into 'getIterator'
07f32c2 [Wenchen Fan(Cloud)] move the LazyProxyIterator to dataDeserialize
d80c426 [Wenchen Fan(Cloud)] remove empty lines in short class
2c8adb2 [Wenchen Fan(Cloud)] add inline comment
8ebff77 [Wenchen Fan(Cloud)] fix compress memory issue during reduce
asfgit pushed a commit that referenced this pull request Jun 25, 2014
When we need to read a compressed block, we will first create a compress stream instance(LZF or Snappy) and use it to wrap that block.
Let's say a reducer task need to read 1000 local shuffle blocks, it will first prepare to read that 1000 blocks, which means create 1000 compression stream instance to wrap them. But the initialization of compression instance will allocate some memory and when we have many compression instance at the same time, it is a problem.
Actually reducer reads the shuffle blocks one by one, so we can do the compression instance initialization lazily.

Author: Wenchen Fan(Cloud) <cloud0fan@gmail.com>

Closes #860 from cloud-fan/fix-compress and squashes the following commits:

0924a6b [Wenchen Fan(Cloud)] rename 'doWork' into 'getIterator'
07f32c2 [Wenchen Fan(Cloud)] move the LazyProxyIterator to dataDeserialize
d80c426 [Wenchen Fan(Cloud)] remove empty lines in short class
2c8adb2 [Wenchen Fan(Cloud)] add inline comment
8ebff77 [Wenchen Fan(Cloud)] fix compress memory issue during reduce
asfgit pushed a commit that referenced this pull request Jun 25, 2014
When we need to read a compressed block, we will first create a compress stream instance(LZF or Snappy) and use it to wrap that block.
Let's say a reducer task need to read 1000 local shuffle blocks, it will first prepare to read that 1000 blocks, which means create 1000 compression stream instance to wrap them. But the initialization of compression instance will allocate some memory and when we have many compression instance at the same time, it is a problem.
Actually reducer reads the shuffle blocks one by one, so we can do the compression instance initialization lazily.

Author: Wenchen Fan(Cloud) <cloud0fan@gmail.com>

Closes #860 from cloud-fan/fix-compress and squashes the following commits:

0924a6b [Wenchen Fan(Cloud)] rename 'doWork' into 'getIterator'
07f32c2 [Wenchen Fan(Cloud)] move the LazyProxyIterator to dataDeserialize
d80c426 [Wenchen Fan(Cloud)] remove empty lines in short class
2c8adb2 [Wenchen Fan(Cloud)] add inline comment
8ebff77 [Wenchen Fan(Cloud)] fix compress memory issue during reduce
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
When we need to read a compressed block, we will first create a compress stream instance(LZF or Snappy) and use it to wrap that block.
Let's say a reducer task need to read 1000 local shuffle blocks, it will first prepare to read that 1000 blocks, which means create 1000 compression stream instance to wrap them. But the initialization of compression instance will allocate some memory and when we have many compression instance at the same time, it is a problem.
Actually reducer reads the shuffle blocks one by one, so we can do the compression instance initialization lazily.

Author: Wenchen Fan(Cloud) <cloud0fan@gmail.com>

Closes apache#860 from cloud-fan/fix-compress and squashes the following commits:

0924a6b [Wenchen Fan(Cloud)] rename 'doWork' into 'getIterator'
07f32c2 [Wenchen Fan(Cloud)] move the LazyProxyIterator to dataDeserialize
d80c426 [Wenchen Fan(Cloud)] remove empty lines in short class
2c8adb2 [Wenchen Fan(Cloud)] add inline comment
8ebff77 [Wenchen Fan(Cloud)] fix compress memory issue during reduce
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants