Skip to content

Commit 1e5c51f

Browse files
wangyumsrowen
authored andcommitted
[SPARK-18827][CORE] Fix cannot read broadcast on disk
## What changes were proposed in this pull request? `NoSuchElementException` will throw since #15056 if a broadcast cannot cache in memory. The reason is that that change cannot cover `!unrolled.hasNext` in `next()` function. This change is to cover the `!unrolled.hasNext` and check `hasNext` before calling `next` in `blockManager.getLocalValues` to make it more robust. We can cache and read broadcast even it cannot fit in memory from this pull request. Exception log: ``` 16/12/10 10:10:04 INFO UnifiedMemoryManager: Will not store broadcast_131 as the required space (1048576 bytes) exceeds our memory limit (122764 bytes) 16/12/10 10:10:04 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_131 in memory. 16/12/10 10:10:04 WARN MemoryStore: Not enough space to cache broadcast_131 in memory! (computed 384.0 B so far) 16/12/10 10:10:04 INFO MemoryStore: Memory use = 95.6 KB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 95.6 KB. Storage limit = 119.9 KB. 16/12/10 10:10:04 ERROR Utils: Exception encountered java.util.NoSuchElementException at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58) at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700) at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210) at scala.Option.map(Option.scala:146) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/12/10 10:10:04 ERROR Executor: Exception in task 1.0 in stage 86.0 (TID 134423) java.io.IOException: java.util.NoSuchElementException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.NoSuchElementException at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58) at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700) at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210) at scala.Option.map(Option.scala:146) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) ... 12 more ``` ## How was this patch tested? Add unit test Author: Yuming Wang <wgyumg@gmail.com> Closes #16252 from wangyum/SPARK-18827.
1 parent c0c9e1d commit 1e5c51f

File tree

3 files changed

+22
-6
lines changed

3 files changed

+22
-6
lines changed

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -207,11 +207,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
207207
TorrentBroadcast.synchronized {
208208
setConf(SparkEnv.get.conf)
209209
val blockManager = SparkEnv.get.blockManager
210-
blockManager.getLocalValues(broadcastId).map(_.data.next()) match {
211-
case Some(x) =>
212-
releaseLock(broadcastId)
213-
x.asInstanceOf[T]
214-
210+
blockManager.getLocalValues(broadcastId) match {
211+
case Some(blockResult) =>
212+
if (blockResult.data.hasNext) {
213+
val x = blockResult.data.next().asInstanceOf[T]
214+
releaseLock(broadcastId)
215+
x
216+
} else {
217+
throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId")
218+
}
215219
case None =>
216220
logInfo("Started reading broadcast variable " + id)
217221
val startTimeMs = System.currentTimeMillis()

core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -702,7 +702,7 @@ private[storage] class PartiallyUnrolledIterator[T](
702702
}
703703

704704
override def next(): T = {
705-
if (unrolled == null) {
705+
if (unrolled == null || !unrolled.hasNext) {
706706
rest.next()
707707
} else {
708708
unrolled.next()

core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,18 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
137137
sc.stop()
138138
}
139139

140+
test("Cache broadcast to disk") {
141+
val conf = new SparkConf()
142+
.setMaster("local")
143+
.setAppName("test")
144+
.set("spark.memory.useLegacyMode", "true")
145+
.set("spark.storage.memoryFraction", "0.0")
146+
sc = new SparkContext(conf)
147+
val list = List[Int](1, 2, 3, 4)
148+
val broadcast = sc.broadcast(list)
149+
assert(broadcast.value.sum === 10)
150+
}
151+
140152
/**
141153
* Verify the persistence of state associated with a TorrentBroadcast in a local-cluster.
142154
*

0 commit comments

Comments
 (0)