Skip to content

Commit 03a5221

Browse files
committed
[SPARK-3116] Remove the excessive lockings in TorrentBroadcast
1 parent 217b5e9 commit 03a5221

File tree

1 file changed

+27
-39
lines changed

1 file changed

+27
-39
lines changed

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 27 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717

1818
package org.apache.spark.broadcast
1919

20-
import java.io.{ByteArrayOutputStream, ByteArrayInputStream, InputStream,
21-
ObjectInputStream, ObjectOutputStream, OutputStream}
20+
import java.io._
2221

2322
import scala.reflect.ClassTag
2423
import scala.util.Random
@@ -53,10 +52,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](
5352

5453
private val broadcastId = BroadcastBlockId(id)
5554

56-
TorrentBroadcast.synchronized {
57-
SparkEnv.get.blockManager.putSingle(
58-
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
59-
}
55+
SparkEnv.get.blockManager.putSingle(
56+
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
6057

6158
@transient private var arrayOfBlocks: Array[TorrentBlock] = null
6259
@transient private var totalBlocks = -1
@@ -91,18 +88,14 @@ private[spark] class TorrentBroadcast[T: ClassTag](
9188
// Store meta-info
9289
val metaId = BroadcastBlockId(id, "meta")
9390
val metaInfo = TorrentInfo(null, totalBlocks, totalBytes)
94-
TorrentBroadcast.synchronized {
95-
SparkEnv.get.blockManager.putSingle(
96-
metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
97-
}
91+
SparkEnv.get.blockManager.putSingle(
92+
metaId, metaInfo, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
9893

9994
// Store individual pieces
10095
for (i <- 0 until totalBlocks) {
10196
val pieceId = BroadcastBlockId(id, "piece" + i)
102-
TorrentBroadcast.synchronized {
103-
SparkEnv.get.blockManager.putSingle(
104-
pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
105-
}
97+
SparkEnv.get.blockManager.putSingle(
98+
pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
10699
}
107100
}
108101

@@ -165,21 +158,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
165158
val metaId = BroadcastBlockId(id, "meta")
166159
var attemptId = 10
167160
while (attemptId > 0 && totalBlocks == -1) {
168-
TorrentBroadcast.synchronized {
169-
SparkEnv.get.blockManager.getSingle(metaId) match {
170-
case Some(x) =>
171-
val tInfo = x.asInstanceOf[TorrentInfo]
172-
totalBlocks = tInfo.totalBlocks
173-
totalBytes = tInfo.totalBytes
174-
arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
175-
hasBlocks = 0
176-
177-
case None =>
178-
Thread.sleep(500)
179-
}
161+
SparkEnv.get.blockManager.getSingle(metaId) match {
162+
case Some(x) =>
163+
val tInfo = x.asInstanceOf[TorrentInfo]
164+
totalBlocks = tInfo.totalBlocks
165+
totalBytes = tInfo.totalBytes
166+
arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
167+
hasBlocks = 0
168+
169+
case None =>
170+
Thread.sleep(500)
180171
}
181172
attemptId -= 1
182173
}
174+
183175
if (totalBlocks == -1) {
184176
return false
185177
}
@@ -192,17 +184,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](
192184
val recvOrder = new Random().shuffle(Array.iterate(0, totalBlocks)(_ + 1).toList)
193185
for (pid <- recvOrder) {
194186
val pieceId = BroadcastBlockId(id, "piece" + pid)
195-
TorrentBroadcast.synchronized {
196-
SparkEnv.get.blockManager.getSingle(pieceId) match {
197-
case Some(x) =>
198-
arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
199-
hasBlocks += 1
200-
SparkEnv.get.blockManager.putSingle(
201-
pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
187+
SparkEnv.get.blockManager.getSingle(pieceId) match {
188+
case Some(x) =>
189+
arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
190+
hasBlocks += 1
191+
SparkEnv.get.blockManager.putSingle(
192+
pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
202193

203-
case None =>
204-
throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
205-
}
194+
case None =>
195+
throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
206196
}
207197
}
208198

@@ -291,9 +281,7 @@ private[broadcast] object TorrentBroadcast extends Logging {
291281
* If removeFromDriver is true, also remove these persisted blocks on the driver.
292282
*/
293283
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
294-
synchronized {
295-
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
296-
}
284+
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
297285
}
298286
}
299287

0 commit comments

Comments
 (0)