Skip to content

Commit 5030923

Browse files
author
Andrew Or
committed
[SPARK-12155][SPARK-12253] Fix executor OOM in unified memory management
**Problem.** In unified memory management, acquiring execution memory may lead to eviction of storage memory. However, the space freed from evicting cached blocks is distributed among all active tasks. Thus, an incorrect upper bound on the execution memory per task can cause the acquisition to fail, leading to OOM's and premature spills. **Example.** Suppose total memory is 1000B, cached blocks occupy 900B, `spark.memory.storageFraction` is 0.4, and there are two active tasks. In this case, the cap on task execution memory is 100B / 2 = 50B. If task A tries to acquire 200B, it will evict 100B of storage but can only acquire 50B because of the incorrect cap. For another example, see this [regression test](https://github.com/andrewor14/spark/blob/fix-oom/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala#L233) that I stole from JoshRosen. **Solution.** Fix the cap on task execution memory. It should take into account the space that could have been freed by storage in addition to the current amount of memory available to execution. In the example above, the correct cap should have been 600B / 2 = 300B. This patch also guards against the race condition (SPARK-12253): (1) Existing tasks collectively occupy all execution memory (2) New task comes in and blocks while existing tasks spill (3) After tasks finish spilling, another task jumps in and puts in a large block, stealing the freed memory (4) New task still cannot acquire memory and goes back to sleep Author: Andrew Or <andrew@databricks.com> Closes #10240 from andrewor14/fix-oom.
1 parent 23a9e62 commit 5030923

File tree

4 files changed

+114
-31
lines changed

4 files changed

+114
-31
lines changed

core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,28 @@ private[memory] class ExecutionMemoryPool(
7070
* active tasks) before it is forced to spill. This can happen if the number of tasks increase
7171
* but an older task had a lot of memory already.
7272
*
73+
* @param numBytes number of bytes to acquire
74+
* @param taskAttemptId the task attempt acquiring memory
75+
* @param maybeGrowPool a callback that potentially grows the size of this pool. It takes in
76+
* one parameter (Long) that represents the desired amount of memory by
77+
* which this pool should be expanded.
78+
* @param computeMaxPoolSize a callback that returns the maximum allowable size of this pool
79+
* at this given moment. This is not a field because the max pool
80+
* size is variable in certain cases. For instance, in unified
81+
* memory management, the execution pool can be expanded by evicting
82+
* cached blocks, thereby shrinking the storage pool.
83+
*
7384
* @return the number of bytes granted to the task.
7485
*/
75-
def acquireMemory(numBytes: Long, taskAttemptId: Long): Long = lock.synchronized {
86+
private[memory] def acquireMemory(
87+
numBytes: Long,
88+
taskAttemptId: Long,
89+
maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
90+
computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
7691
assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")
7792

93+
// TODO: clean up this clunky method signature
94+
7895
// Add this task to the taskMemory map just so we can keep an accurate count of the number
7996
// of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`
8097
if (!memoryForTask.contains(taskAttemptId)) {
@@ -91,25 +108,31 @@ private[memory] class ExecutionMemoryPool(
91108
val numActiveTasks = memoryForTask.keys.size
92109
val curMem = memoryForTask(taskAttemptId)
93110

94-
// How much we can grant this task; don't let it grow to more than 1 / numActiveTasks;
95-
// don't let it be negative
96-
val maxToGrant =
97-
math.min(numBytes, math.max(0, (poolSize / numActiveTasks) - curMem))
111+
// In every iteration of this loop, we should first try to reclaim any borrowed execution
112+
// space from storage. This is necessary because of the potential race condition where new
113+
// storage blocks may steal the free execution memory that this task was waiting for.
114+
maybeGrowPool(numBytes - memoryFree)
115+
116+
// Maximum size the pool would have after potentially growing the pool.
117+
// This is used to compute the upper bound of how much memory each task can occupy. This
118+
// must take into account potential free memory as well as the amount this pool currently
119+
// occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management,
120+
// we did not take into account space that could have been freed by evicting cached blocks.
121+
val maxPoolSize = computeMaxPoolSize()
122+
val maxMemoryPerTask = maxPoolSize / numActiveTasks
123+
val minMemoryPerTask = poolSize / (2 * numActiveTasks)
124+
125+
// How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
126+
val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
98127
// Only give it as much memory as is free, which might be none if it reached 1 / numTasks
99128
val toGrant = math.min(maxToGrant, memoryFree)
100129

101-
if (curMem < poolSize / (2 * numActiveTasks)) {
102-
// We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
103-
// if we can't give it this much now, wait for other tasks to free up memory
104-
// (this happens if older tasks allocated lots of memory before N grew)
105-
if (memoryFree >= math.min(maxToGrant, poolSize / (2 * numActiveTasks) - curMem)) {
106-
memoryForTask(taskAttemptId) += toGrant
107-
return toGrant
108-
} else {
109-
logInfo(
110-
s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
111-
lock.wait()
112-
}
130+
// We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
131+
// if we can't give it this much now, wait for other tasks to free up memory
132+
// (this happens if older tasks allocated lots of memory before N grew)
133+
if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
134+
logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
135+
lock.wait()
113136
} else {
114137
memoryForTask(taskAttemptId) += toGrant
115138
return toGrant

core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -81,22 +81,51 @@ private[spark] class UnifiedMemoryManager private[memory] (
8181
assert(numBytes >= 0)
8282
memoryMode match {
8383
case MemoryMode.ON_HEAP =>
84-
if (numBytes > onHeapExecutionMemoryPool.memoryFree) {
85-
val extraMemoryNeeded = numBytes - onHeapExecutionMemoryPool.memoryFree
86-
// There is not enough free memory in the execution pool, so try to reclaim memory from
87-
// storage. We can reclaim any free memory from the storage pool. If the storage pool
88-
// has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
89-
// the memory that storage has borrowed from execution.
90-
val memoryReclaimableFromStorage =
91-
math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
92-
if (memoryReclaimableFromStorage > 0) {
93-
// Only reclaim as much space as is necessary and available:
94-
val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
95-
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
96-
onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
84+
85+
/**
86+
* Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool.
87+
*
88+
* When acquiring memory for a task, the execution pool may need to make multiple
89+
* attempts. Each attempt must be able to evict storage in case another task jumps in
90+
* and caches a large block between the attempts. This is called once per attempt.
91+
*/
92+
def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
93+
if (extraMemoryNeeded > 0) {
94+
// There is not enough free memory in the execution pool, so try to reclaim memory from
95+
// storage. We can reclaim any free memory from the storage pool. If the storage pool
96+
// has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
97+
// the memory that storage has borrowed from execution.
98+
val memoryReclaimableFromStorage =
99+
math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
100+
if (memoryReclaimableFromStorage > 0) {
101+
// Only reclaim as much space as is necessary and available:
102+
val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
103+
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
104+
onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
105+
}
97106
}
98107
}
99-
onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
108+
109+
/**
110+
* The size the execution pool would have after evicting storage memory.
111+
*
112+
* The execution memory pool divides this quantity among the active tasks evenly to cap
113+
* the execution memory allocation for each task. It is important to keep this greater
114+
* than the execution pool size, which doesn't take into account potential memory that
115+
* could be freed by evicting storage. Otherwise we may hit SPARK-12155.
116+
*
117+
* Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness
118+
* in execution memory allocation across tasks, Otherwise, a task may occupy more than
119+
* its fair share of execution memory, mistakenly thinking that other tasks can acquire
120+
* the portion of storage memory that cannot be evicted.
121+
*/
122+
def computeMaxExecutionPoolSize(): Long = {
123+
maxMemory - math.min(storageMemoryUsed, storageRegionSize)
124+
}
125+
126+
onHeapExecutionMemoryPool.acquireMemory(
127+
numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
128+
100129
case MemoryMode.OFF_HEAP =>
101130
// For now, we only support on-heap caching of data, so we do not need to interact with
102131
// the storage pool when allocating off-heap memory. This will change in the future, though.

core/src/main/scala/org/apache/spark/scheduler/Task.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@ private[spark] abstract class Task[T](
9292
Utils.tryLogNonFatalError {
9393
// Release memory used by this thread for unrolling blocks
9494
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
95+
// Notify any tasks waiting for execution memory to be freed to wake up and try to
96+
// acquire memory again. This makes impossible the scenario where a task sleeps forever
97+
// because there are no other tasks left to notify it. Since this is safe to do but may
98+
// not be strictly necessary, we should revisit whether we can remove this in the future.
99+
val memoryManager = SparkEnv.get.memoryManager
100+
memoryManager.synchronized { memoryManager.notifyAll() }
95101
}
96102
} finally {
97103
TaskContext.unset()

core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,4 +230,29 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
230230
assert(exception.getMessage.contains("larger heap size"))
231231
}
232232

233+
test("execution can evict cached blocks when there are multiple active tasks (SPARK-12155)") {
234+
val conf = new SparkConf()
235+
.set("spark.memory.fraction", "1")
236+
.set("spark.memory.storageFraction", "0")
237+
.set("spark.testing.memory", "1000")
238+
val mm = UnifiedMemoryManager(conf, numCores = 2)
239+
val ms = makeMemoryStore(mm)
240+
assert(mm.maxMemory === 1000)
241+
// Have two tasks each acquire some execution memory so that the memory pool registers that
242+
// there are two active tasks:
243+
assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L)
244+
assert(mm.acquireExecutionMemory(100L, 1, MemoryMode.ON_HEAP) === 100L)
245+
// Fill up all of the remaining memory with storage.
246+
assert(mm.acquireStorageMemory(dummyBlock, 800L, evictedBlocks))
247+
assertEvictBlocksToFreeSpaceNotCalled(ms)
248+
assert(mm.storageMemoryUsed === 800)
249+
assert(mm.executionMemoryUsed === 200)
250+
// A task should still be able to allocate 100 bytes execution memory by evicting blocks
251+
assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L)
252+
assertEvictBlocksToFreeSpaceCalled(ms, 100L)
253+
assert(mm.executionMemoryUsed === 300)
254+
assert(mm.storageMemoryUsed === 700)
255+
assert(evictedBlocks.nonEmpty)
256+
}
257+
233258
}

0 commit comments

Comments
 (0)