Skip to content

Commit 633d63a

Browse files
JoshRosenAndrew Or
authored andcommitted
[SPARK-12757] Add block-level read/write locks to BlockManager
## Motivation As a pre-requisite to off-heap caching of blocks, we need a mechanism to prevent pages / blocks from being evicted while they are being read. With on-heap objects, evicting a block while it is being read merely leads to memory-accounting problems (because we assume that an evicted block is a candidate for garbage-collection, which will not be true during a read), but with off-heap memory this will lead to either data corruption or segmentation faults. ## Changes ### BlockInfoManager and reader/writer locks This patch adds block-level read/write locks to the BlockManager. It introduces a new `BlockInfoManager` component, which is contained within the `BlockManager`, holds the `BlockInfo` objects that the `BlockManager` uses for tracking block metadata, and exposes APIs for locking blocks in either shared read or exclusive write modes. `BlockManager`'s `get*()` and `put*()` methods now implicitly acquire the necessary locks. After a `get()` call successfully retrieves a block, that block is locked in a shared read mode. A `put()` call will block until it acquires an exclusive write lock. If the write succeeds, the write lock will be downgraded to a shared read lock before returning to the caller. This `put()` locking behavior allows us store a block and then immediately turn around and read it without having to worry about it having been evicted between the write and the read, which will allow us to significantly simplify `CacheManager` in the future (see #10748). See `BlockInfoManagerSuite`'s test cases for a more detailed specification of the locking semantics. ### Auto-release of locks at the end of tasks Our locking APIs support explicit release of locks (by calling `unlock()`), but it's not always possible to guarantee that locks will be released prior to the end of the task. One reason for this is our iterator interface: since our iterators don't support an explicit `close()` operator to signal that no more records will be consumed, operations like `take()` or `limit()` don't have a good means to release locks on their input iterators' blocks. Another example is broadcast variables, whose block locks can only be released at the end of the task. To address this, `BlockInfoManager` uses a pair of maps to track the set of locks acquired by each task. Lock acquisitions automatically record the current task attempt id by obtaining it from `TaskContext`. When a task finishes, code in `Executor` calls `BlockInfoManager.unlockAllLocksForTask(taskAttemptId)` to free locks. ### Locking and the MemoryStore In order to prevent in-memory blocks from being evicted while they are being read, the `MemoryStore`'s `evictBlocksToFreeSpace()` method acquires write locks on blocks which it is considering as candidates for eviction. These lock acquisitions are non-blocking, so a block which is being read will not be evicted. By holding write locks until the eviction is performed or skipped (in case evicting the blocks would not free enough memory), we avoid a race where a new reader starts to read a block after the block has been marked as an eviction candidate but before it has been removed. ### Locking and remote block transfer This patch makes small changes to to block transfer and network layer code so that locks acquired by the BlockTransferService are released as soon as block transfer messages are consumed and released by Netty. This builds on top of #11193, a bug fix related to freeing of network layer ManagedBuffers. ## FAQ - **Why not use Java's built-in [`ReadWriteLock`](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReadWriteLock.html)?** Our locks operate on a per-task rather than per-thread level. Under certain circumstances a task may consist of multiple threads, so using `ReadWriteLock` would mean that we might call `unlock()` from a thread which didn't hold the lock in question, an operation which has undefined semantics. If we could rely on Java 8 classes, we might be able to use [`StampedLock`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/StampedLock.html) to work around this issue. - **Why not detect "leaked" locks in tests?**: See above notes about `take()` and `limit`. Author: Josh Rosen <joshrosen@databricks.com> Closes #10705 from JoshRosen/pin-pages.
1 parent 7129957 commit 633d63a

File tree

22 files changed

+1384
-438
lines changed

22 files changed

+1384
-438
lines changed

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import scala.collection.mutable
2121

2222
import org.apache.spark.rdd.RDD
2323
import org.apache.spark.storage._
24+
import org.apache.spark.util.CompletionIterator
2425

2526
/**
2627
* Spark class responsible for passing RDDs partition contents to the BlockManager and making
@@ -47,6 +48,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
4748
existingMetrics.incBytesReadInternal(blockResult.bytes)
4849

4950
val iter = blockResult.data.asInstanceOf[Iterator[T]]
51+
5052
new InterruptibleIterator[T](context, iter) {
5153
override def next(): T = {
5254
existingMetrics.incRecordsReadInternal(1)
@@ -156,7 +158,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
156158
case Left(arr) =>
157159
// We have successfully unrolled the entire partition, so cache it in memory
158160
blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
159-
arr.iterator.asInstanceOf[Iterator[T]]
161+
CompletionIterator[T, Iterator[T]](
162+
arr.iterator.asInstanceOf[Iterator[T]],
163+
blockManager.releaseLock(key))
160164
case Right(it) =>
161165
// There is not enough space to cache this partition in memory
162166
val returnValues = it.asInstanceOf[Iterator[T]]

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 51 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ import scala.collection.JavaConverters._
2424
import scala.reflect.ClassTag
2525
import scala.util.Random
2626

27-
import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
27+
import org.apache.spark._
2828
import org.apache.spark.io.CompressionCodec
2929
import org.apache.spark.serializer.Serializer
30-
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
30+
import org.apache.spark.storage.{BlockId, BroadcastBlockId, StorageLevel}
3131
import org.apache.spark.util.{ByteBufferInputStream, Utils}
3232
import org.apache.spark.util.io.ByteArrayChunkOutputStream
3333

@@ -90,22 +90,29 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
9090

9191
/**
9292
* Divide the object into multiple blocks and put those blocks in the block manager.
93+
*
9394
* @param value the object to divide
9495
* @return number of blocks this broadcast variable is divided into
9596
*/
9697
private def writeBlocks(value: T): Int = {
98+
import StorageLevel._
9799
// Store a copy of the broadcast variable in the driver so that tasks run on the driver
98100
// do not create a duplicate copy of the broadcast variable's value.
99-
SparkEnv.get.blockManager.putSingle(broadcastId, value, StorageLevel.MEMORY_AND_DISK,
100-
tellMaster = false)
101+
val blockManager = SparkEnv.get.blockManager
102+
if (blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
103+
blockManager.releaseLock(broadcastId)
104+
} else {
105+
throw new SparkException(s"Failed to store $broadcastId in BlockManager")
106+
}
101107
val blocks =
102108
TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
103109
blocks.zipWithIndex.foreach { case (block, i) =>
104-
SparkEnv.get.blockManager.putBytes(
105-
BroadcastBlockId(id, "piece" + i),
106-
block,
107-
StorageLevel.MEMORY_AND_DISK_SER,
108-
tellMaster = true)
110+
val pieceId = BroadcastBlockId(id, "piece" + i)
111+
if (blockManager.putBytes(pieceId, block, MEMORY_AND_DISK_SER, tellMaster = true)) {
112+
blockManager.releaseLock(pieceId)
113+
} else {
114+
throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager")
115+
}
109116
}
110117
blocks.length
111118
}
@@ -127,16 +134,18 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
127134
def getRemote: Option[ByteBuffer] = bm.getRemoteBytes(pieceId).map { block =>
128135
// If we found the block from remote executors/driver's BlockManager, put the block
129136
// in this executor's BlockManager.
130-
SparkEnv.get.blockManager.putBytes(
131-
pieceId,
132-
block,
133-
StorageLevel.MEMORY_AND_DISK_SER,
134-
tellMaster = true)
137+
if (!bm.putBytes(pieceId, block, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
138+
throw new SparkException(
139+
s"Failed to store $pieceId of $broadcastId in local BlockManager")
140+
}
135141
block
136142
}
137143
val block: ByteBuffer = getLocal.orElse(getRemote).getOrElse(
138144
throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
145+
// At this point we are guaranteed to hold a read lock, since we either got the block locally
146+
// or stored the remotely-fetched block and automatically downgraded the write lock.
139147
blocks(pid) = block
148+
releaseLock(pieceId)
140149
}
141150
blocks
142151
}
@@ -165,8 +174,10 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
165174
private def readBroadcastBlock(): T = Utils.tryOrIOException {
166175
TorrentBroadcast.synchronized {
167176
setConf(SparkEnv.get.conf)
168-
SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match {
177+
val blockManager = SparkEnv.get.blockManager
178+
blockManager.getLocal(broadcastId).map(_.data.next()) match {
169179
case Some(x) =>
180+
releaseLock(broadcastId)
170181
x.asInstanceOf[T]
171182

172183
case None =>
@@ -179,13 +190,36 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
179190
blocks, SparkEnv.get.serializer, compressionCodec)
180191
// Store the merged copy in BlockManager so other tasks on this executor don't
181192
// need to re-fetch it.
182-
SparkEnv.get.blockManager.putSingle(
183-
broadcastId, obj, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
193+
val storageLevel = StorageLevel.MEMORY_AND_DISK
194+
if (blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
195+
releaseLock(broadcastId)
196+
} else {
197+
throw new SparkException(s"Failed to store $broadcastId in BlockManager")
198+
}
184199
obj
185200
}
186201
}
187202
}
188203

204+
/**
205+
* If running in a task, register the given block's locks for release upon task completion.
206+
* Otherwise, if not running in a task then immediately release the lock.
207+
*/
208+
private def releaseLock(blockId: BlockId): Unit = {
209+
val blockManager = SparkEnv.get.blockManager
210+
Option(TaskContext.get()) match {
211+
case Some(taskContext) =>
212+
taskContext.addTaskCompletionListener(_ => blockManager.releaseLock(blockId))
213+
case None =>
214+
// This should only happen on the driver, where broadcast variables may be accessed
215+
// outside of running tasks (e.g. when computing rdd.partitions()). In order to allow
216+
// broadcast variables to be garbage collected we need to free the reference here
217+
// which is slightly unsafe but is technically okay because broadcast variables aren't
218+
// stored off-heap.
219+
blockManager.releaseLock(blockId)
220+
}
221+
}
222+
189223
}
190224

191225

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,9 @@ private[spark] class Executor(
218218
threwException = false
219219
res
220220
} finally {
221+
val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
221222
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
223+
222224
if (freedMemory > 0) {
223225
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
224226
if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) && !threwException) {
@@ -227,6 +229,17 @@ private[spark] class Executor(
227229
logError(errMsg)
228230
}
229231
}
232+
233+
if (releasedLocks.nonEmpty) {
234+
val errMsg =
235+
s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +
236+
releasedLocks.mkString("[", ", ", "]")
237+
if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false) && !threwException) {
238+
throw new SparkException(errMsg)
239+
} else {
240+
logError(errMsg)
241+
}
242+
}
230243
}
231244
val taskFinish = System.currentTimeMillis()
232245

@@ -266,8 +279,11 @@ private[spark] class Executor(
266279
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
267280
} else if (resultSize >= maxRpcMessageSize) {
268281
val blockId = TaskResultBlockId(taskId)
269-
env.blockManager.putBytes(
282+
val putSucceeded = env.blockManager.putBytes(
270283
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
284+
if (putSucceeded) {
285+
env.blockManager.releaseLock(blockId)
286+
}
271287
logInfo(
272288
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
273289
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))

core/src/main/scala/org/apache/spark/network/BlockDataManager.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,14 @@ trait BlockDataManager {
3131

3232
/**
3333
* Put the block locally, using the given storage level.
34+
*
35+
* Returns true if the block was stored and false if the put operation failed or the block
36+
* already existed.
3437
*/
35-
def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Unit
38+
def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Boolean
39+
40+
/**
41+
* Release locks acquired by [[putBlockData()]] and [[getBlockData()]].
42+
*/
43+
def releaseLock(blockId: BlockId): Unit
3644
}

core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,11 @@ class NettyBlockRpcServer(
6565
val level: StorageLevel =
6666
serializer.newInstance().deserialize(ByteBuffer.wrap(uploadBlock.metadata))
6767
val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
68-
blockManager.putBlockData(BlockId(uploadBlock.blockId), data, level)
68+
val blockId = BlockId(uploadBlock.blockId)
69+
val putSucceeded = blockManager.putBlockData(blockId, data, level)
70+
if (putSucceeded) {
71+
blockManager.releaseLock(blockId)
72+
}
6973
responseContext.onSuccess(ByteBuffer.allocate(0))
7074
}
7175
}

core/src/main/scala/org/apache/spark/scheduler/Task.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ private[spark] abstract class Task[T](
6464
taskAttemptId: Long,
6565
attemptNumber: Int,
6666
metricsSystem: MetricsSystem): T = {
67+
SparkEnv.get.blockManager.registerTask(taskAttemptId)
6768
context = new TaskContextImpl(
6869
stageId,
6970
partitionId,

core/src/main/scala/org/apache/spark/storage/BlockInfo.scala

Lines changed: 0 additions & 83 deletions
This file was deleted.

0 commit comments

Comments
 (0)