Skip to content

[SPARK-17503][Core] Fix memory leak in Memory store when unable to cache the whole RDD in memory #15056

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

clockfly
Copy link
Contributor

@clockfly clockfly commented Sep 12, 2016

What changes were proposed in this pull request?

MemoryStore may throw OutOfMemoryError when trying to cache a super big RDD that cannot fit in memory.

   scala> sc.parallelize(1 to 1000000000, 100).map(x => new Array[Long](1000)).cache().count()

   java.lang.OutOfMemoryError: Java heap space
    at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
    at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
    at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Spark MemoryStore uses SizeTrackingVector as a temporary unrolling buffer to store all input values that it has read so far before transferring the values to storage memory cache. The problem is that when the input RDD is too big for caching in memory, the temporary unrolling memory SizeTrackingVector is not garbage collected in time. As SizeTrackingVector can occupy all available storage memory, it may cause the executor JVM to run out of memory quickly.

More info can be found at https://issues.apache.org/jira/browse/SPARK-17503

How was this patch tested?

Unit test and manual test.

Before change

Heap memory consumption
screen shot 2016-09-12 at 4 16 15 pm

Heap dump
screen shot 2016-09-12 at 4 34 19 pm

After change

Heap memory consumption
screen shot 2016-09-12 at 4 29 10 pm

@clockfly clockfly changed the title [SPARK-17503][Core] Fix memory leak in Memory store when unable to cache the whole RDD [SPARK-17503][Core] Fix memory leak in Memory store when unable to cache the whole RDD in memory Sep 12, 2016
@clockfly
Copy link
Contributor Author

@yhuai

@SparkQA
Copy link

SparkQA commented Sep 12, 2016

Test build #65243 has finished for PR 15056 at commit a9a4a8b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Sep 12, 2016

Oh I get it now. That looks good, to my understanding.

@cloud-fan
Copy link
Contributor

LGTM

unrolledIteratorIsConsumed = true
memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory)
})
completionIterator ++ rest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's see if I understand the problem.

Because BlockManager may call close early, so we cannot rely on CompletionIterator to free the memory because we will never actually consume all of elements of unrolled, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem here is that the completion iterator is releasing the bookkeeping memory for the iterator as soon as the iterator is fully iterated, but the on-heap objects are being retained by the reference in the unrolled field.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, yea, you are right.

@JoshRosen
Copy link
Contributor

LGTM as well, so I'm going to merge this to master and branch-2.0 (2.0.1-SNAPSHOT). Thanks!

@asfgit asfgit closed this in 1742c3a Sep 12, 2016
asfgit pushed a commit that referenced this pull request Sep 12, 2016
…che the whole RDD in memory

## What changes were proposed in this pull request?

   MemoryStore may throw OutOfMemoryError when trying to cache a super big RDD that cannot fit in memory.
   ```
   scala> sc.parallelize(1 to 1000000000, 100).map(x => new Array[Long](1000)).cache().count()

   java.lang.OutOfMemoryError: Java heap space
	at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
	at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
   ```

Spark MemoryStore uses SizeTrackingVector as a temporary unrolling buffer to store all input values that it has read so far before transferring the values to storage memory cache. The problem is that when the input RDD is too big for caching in memory, the temporary unrolling memory SizeTrackingVector is not garbage collected in time. As SizeTrackingVector can occupy all available storage memory, it may cause the executor JVM to run out of memory quickly.

More info can be found at https://issues.apache.org/jira/browse/SPARK-17503

## How was this patch tested?

Unit test and manual test.

### Before change

Heap memory consumption
<img width="702" alt="screen shot 2016-09-12 at 4 16 15 pm" src="https://cloud.githubusercontent.com/assets/2595532/18429524/60d73a26-7906-11e6-9768-6f286f5c58c8.png">

Heap dump
<img width="1402" alt="screen shot 2016-09-12 at 4 34 19 pm" src="https://cloud.githubusercontent.com/assets/2595532/18429577/cbc1ef20-7906-11e6-847b-b5903f450b3b.png">

### After change

Heap memory consumption
<img width="706" alt="screen shot 2016-09-12 at 4 29 10 pm" src="https://cloud.githubusercontent.com/assets/2595532/18429503/4abe9342-7906-11e6-844a-b2f815072624.png">

Author: Sean Zhong <seanzhong@databricks.com>

Closes #15056 from clockfly/memory_store_leak.

(cherry picked from commit 1742c3a)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
wgtmac pushed a commit to wgtmac/spark that referenced this pull request Sep 19, 2016
…che the whole RDD in memory

## What changes were proposed in this pull request?

   MemoryStore may throw OutOfMemoryError when trying to cache a super big RDD that cannot fit in memory.
   ```
   scala> sc.parallelize(1 to 1000000000, 100).map(x => new Array[Long](1000)).cache().count()

   java.lang.OutOfMemoryError: Java heap space
	at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:24)
	at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:23)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$JoinIterator.next(Iterator.scala:232)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:683)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1684)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1915)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
   ```

Spark MemoryStore uses SizeTrackingVector as a temporary unrolling buffer to store all input values that it has read so far before transferring the values to storage memory cache. The problem is that when the input RDD is too big for caching in memory, the temporary unrolling memory SizeTrackingVector is not garbage collected in time. As SizeTrackingVector can occupy all available storage memory, it may cause the executor JVM to run out of memory quickly.

More info can be found at https://issues.apache.org/jira/browse/SPARK-17503

## How was this patch tested?

Unit test and manual test.

### Before change

Heap memory consumption
<img width="702" alt="screen shot 2016-09-12 at 4 16 15 pm" src="https://cloud.githubusercontent.com/assets/2595532/18429524/60d73a26-7906-11e6-9768-6f286f5c58c8.png">

Heap dump
<img width="1402" alt="screen shot 2016-09-12 at 4 34 19 pm" src="https://cloud.githubusercontent.com/assets/2595532/18429577/cbc1ef20-7906-11e6-847b-b5903f450b3b.png">

### After change

Heap memory consumption
<img width="706" alt="screen shot 2016-09-12 at 4 29 10 pm" src="https://cloud.githubusercontent.com/assets/2595532/18429503/4abe9342-7906-11e6-844a-b2f815072624.png">

Author: Sean Zhong <seanzhong@databricks.com>

Closes apache#15056 from clockfly/memory_store_leak.
asfgit pushed a commit that referenced this pull request Dec 18, 2016
## What changes were proposed in this pull request?
`NoSuchElementException` will throw since #15056 if a broadcast cannot cache in memory. The reason is that that change cannot cover `!unrolled.hasNext` in `next()` function.

This change is to cover the `!unrolled.hasNext` and check `hasNext` before calling `next` in `blockManager.getLocalValues` to make it  more robust.

We can cache and read broadcast even it cannot fit in memory from this pull request.

Exception log:
```
16/12/10 10:10:04 INFO UnifiedMemoryManager: Will not store broadcast_131 as the required space (1048576 bytes) exceeds our memory limit (122764 bytes)
16/12/10 10:10:04 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_131 in memory.
16/12/10 10:10:04 WARN MemoryStore: Not enough space to cache broadcast_131 in memory! (computed 384.0 B so far)
16/12/10 10:10:04 INFO MemoryStore: Memory use = 95.6 KB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 95.6 KB. Storage limit = 119.9 KB.
16/12/10 10:10:04 ERROR Utils: Exception encountered
java.util.NoSuchElementException
	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
16/12/10 10:10:04 ERROR Executor: Exception in task 1.0 in stage 86.0 (TID 134423)
java.io.IOException: java.util.NoSuchElementException
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
	... 12 more
```

## How was this patch tested?

Add unit test

Author: Yuming Wang <wgyumg@gmail.com>

Closes #16252 from wangyum/SPARK-18827.

(cherry picked from commit 1e5c51f)
Signed-off-by: Sean Owen <sowen@cloudera.com>
asfgit pushed a commit that referenced this pull request Dec 18, 2016
## What changes were proposed in this pull request?
`NoSuchElementException` will throw since #15056 if a broadcast cannot cache in memory. The reason is that that change cannot cover `!unrolled.hasNext` in `next()` function.

This change is to cover the `!unrolled.hasNext` and check `hasNext` before calling `next` in `blockManager.getLocalValues` to make it  more robust.

We can cache and read broadcast even it cannot fit in memory from this pull request.

Exception log:
```
16/12/10 10:10:04 INFO UnifiedMemoryManager: Will not store broadcast_131 as the required space (1048576 bytes) exceeds our memory limit (122764 bytes)
16/12/10 10:10:04 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_131 in memory.
16/12/10 10:10:04 WARN MemoryStore: Not enough space to cache broadcast_131 in memory! (computed 384.0 B so far)
16/12/10 10:10:04 INFO MemoryStore: Memory use = 95.6 KB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 95.6 KB. Storage limit = 119.9 KB.
16/12/10 10:10:04 ERROR Utils: Exception encountered
java.util.NoSuchElementException
	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
16/12/10 10:10:04 ERROR Executor: Exception in task 1.0 in stage 86.0 (TID 134423)
java.io.IOException: java.util.NoSuchElementException
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
	... 12 more
```

## How was this patch tested?

Add unit test

Author: Yuming Wang <wgyumg@gmail.com>

Closes #16252 from wangyum/SPARK-18827.

(cherry picked from commit 1e5c51f)
Signed-off-by: Sean Owen <sowen@cloudera.com>
asfgit pushed a commit that referenced this pull request Dec 18, 2016
## What changes were proposed in this pull request?
`NoSuchElementException` will throw since #15056 if a broadcast cannot cache in memory. The reason is that that change cannot cover `!unrolled.hasNext` in `next()` function.

This change is to cover the `!unrolled.hasNext` and check `hasNext` before calling `next` in `blockManager.getLocalValues` to make it  more robust.

We can cache and read broadcast even it cannot fit in memory from this pull request.

Exception log:
```
16/12/10 10:10:04 INFO UnifiedMemoryManager: Will not store broadcast_131 as the required space (1048576 bytes) exceeds our memory limit (122764 bytes)
16/12/10 10:10:04 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_131 in memory.
16/12/10 10:10:04 WARN MemoryStore: Not enough space to cache broadcast_131 in memory! (computed 384.0 B so far)
16/12/10 10:10:04 INFO MemoryStore: Memory use = 95.6 KB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 95.6 KB. Storage limit = 119.9 KB.
16/12/10 10:10:04 ERROR Utils: Exception encountered
java.util.NoSuchElementException
	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
16/12/10 10:10:04 ERROR Executor: Exception in task 1.0 in stage 86.0 (TID 134423)
java.io.IOException: java.util.NoSuchElementException
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
	... 12 more
```

## How was this patch tested?

Add unit test

Author: Yuming Wang <wgyumg@gmail.com>

Closes #16252 from wangyum/SPARK-18827.
blendmaster pushed a commit to fullcontact/spark that referenced this pull request Jan 18, 2017
## What changes were proposed in this pull request?
`NoSuchElementException` will throw since apache#15056 if a broadcast cannot cache in memory. The reason is that that change cannot cover `!unrolled.hasNext` in `next()` function.

This change is to cover the `!unrolled.hasNext` and check `hasNext` before calling `next` in `blockManager.getLocalValues` to make it  more robust.

We can cache and read broadcast even it cannot fit in memory from this pull request.

Exception log:
```
16/12/10 10:10:04 INFO UnifiedMemoryManager: Will not store broadcast_131 as the required space (1048576 bytes) exceeds our memory limit (122764 bytes)
16/12/10 10:10:04 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_131 in memory.
16/12/10 10:10:04 WARN MemoryStore: Not enough space to cache broadcast_131 in memory! (computed 384.0 B so far)
16/12/10 10:10:04 INFO MemoryStore: Memory use = 95.6 KB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 95.6 KB. Storage limit = 119.9 KB.
16/12/10 10:10:04 ERROR Utils: Exception encountered
java.util.NoSuchElementException
	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
16/12/10 10:10:04 ERROR Executor: Exception in task 1.0 in stage 86.0 (TID 134423)
java.io.IOException: java.util.NoSuchElementException
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
	... 12 more
```

## How was this patch tested?

Add unit test

Author: Yuming Wang <wgyumg@gmail.com>

Closes apache#16252 from wangyum/SPARK-18827.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?
`NoSuchElementException` will throw since apache#15056 if a broadcast cannot cache in memory. The reason is that that change cannot cover `!unrolled.hasNext` in `next()` function.

This change is to cover the `!unrolled.hasNext` and check `hasNext` before calling `next` in `blockManager.getLocalValues` to make it  more robust.

We can cache and read broadcast even it cannot fit in memory from this pull request.

Exception log:
```
16/12/10 10:10:04 INFO UnifiedMemoryManager: Will not store broadcast_131 as the required space (1048576 bytes) exceeds our memory limit (122764 bytes)
16/12/10 10:10:04 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_131 in memory.
16/12/10 10:10:04 WARN MemoryStore: Not enough space to cache broadcast_131 in memory! (computed 384.0 B so far)
16/12/10 10:10:04 INFO MemoryStore: Memory use = 95.6 KB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 95.6 KB. Storage limit = 119.9 KB.
16/12/10 10:10:04 ERROR Utils: Exception encountered
java.util.NoSuchElementException
	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
16/12/10 10:10:04 ERROR Executor: Exception in task 1.0 in stage 86.0 (TID 134423)
java.io.IOException: java.util.NoSuchElementException
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException
	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
	... 12 more
```

## How was this patch tested?

Add unit test

Author: Yuming Wang <wgyumg@gmail.com>

Closes apache#16252 from wangyum/SPARK-18827.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants