Skip to content

[SPARK-12155] [SPARK-12253] Fix executor OOM in unified memory management #10240

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,28 @@ private[memory] class ExecutionMemoryPool(
* active tasks) before it is forced to spill. This can happen if the number of tasks increase
* but an older task had a lot of memory already.
*
* @param numBytes number of bytes to acquire
* @param taskAttemptId the task attempt acquiring memory
* @param maybeGrowPool a callback that potentially grows the size of this pool. It takes in
* one parameter (Long) that represents the desired amount of memory by
* which this pool should be expanded.
* @param computeMaxPoolSize a callback that returns the maximum allowable size of this pool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you take into account the memory that can be freed then isn't this a fixed value?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual used memory by storage could be changed, so it's not a fixed value

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm, I see now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, because if storage memory used is below a certain mark (default 0.5 of max memory) then it cannot be evicted. In this case the max pool size depends on how much unevictable storage memory there is, which varies over time.

* at this given moment. This is not a field because the max pool
* size is variable in certain cases. For instance, in unified
* memory management, the execution pool can be expanded by evicting
* cached blocks, thereby shrinking the storage pool.
*
* @return the number of bytes granted to the task.
*/
def acquireMemory(numBytes: Long, taskAttemptId: Long): Long = lock.synchronized {
private[memory] def acquireMemory(
numBytes: Long,
taskAttemptId: Long,
maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")

// TODO: clean up this clunky method signature

// Add this task to the taskMemory map just so we can keep an accurate count of the number
// of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`
if (!memoryForTask.contains(taskAttemptId)) {
Expand All @@ -91,25 +108,31 @@ private[memory] class ExecutionMemoryPool(
val numActiveTasks = memoryForTask.keys.size
val curMem = memoryForTask(taskAttemptId)

// How much we can grant this task; don't let it grow to more than 1 / numActiveTasks;
// don't let it be negative
val maxToGrant =
math.min(numBytes, math.max(0, (poolSize / numActiveTasks) - curMem))
// In every iteration of this loop, we should first try to reclaim any borrowed execution
// space from storage. This is necessary because of the potential race condition where new
// storage blocks may steal the free execution memory that this task was waiting for.
maybeGrowPool(numBytes - memoryFree)

// Maximum size the pool would have after potentially growing the pool.
// This is used to compute the upper bound of how much memory each task can occupy. This
// must take into account potential free memory as well as the amount this pool currently
// occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management,
// we did not take into account space that could have been freed by evicting cached blocks.
val maxPoolSize = computeMaxPoolSize()
val maxMemoryPerTask = maxPoolSize / numActiveTasks
val minMemoryPerTask = poolSize / (2 * numActiveTasks)

// How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
// Only give it as much memory as is free, which might be none if it reached 1 / numTasks
val toGrant = math.min(maxToGrant, memoryFree)

if (curMem < poolSize / (2 * numActiveTasks)) {
// We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
// if we can't give it this much now, wait for other tasks to free up memory
// (this happens if older tasks allocated lots of memory before N grew)
if (memoryFree >= math.min(maxToGrant, poolSize / (2 * numActiveTasks) - curMem)) {
memoryForTask(taskAttemptId) += toGrant
return toGrant
} else {
logInfo(
s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
lock.wait()
}
// We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
// if we can't give it this much now, wait for other tasks to free up memory
// (this happens if older tasks allocated lots of memory before N grew)
if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
lock.wait()
} else {
memoryForTask(taskAttemptId) += toGrant
return toGrant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,22 +81,51 @@ private[spark] class UnifiedMemoryManager private[memory] (
assert(numBytes >= 0)
memoryMode match {
case MemoryMode.ON_HEAP =>
if (numBytes > onHeapExecutionMemoryPool.memoryFree) {
val extraMemoryNeeded = numBytes - onHeapExecutionMemoryPool.memoryFree
// There is not enough free memory in the execution pool, so try to reclaim memory from
// storage. We can reclaim any free memory from the storage pool. If the storage pool
// has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
// the memory that storage has borrowed from execution.
val memoryReclaimableFromStorage =
math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
if (memoryReclaimableFromStorage > 0) {
// Only reclaim as much space as is necessary and available:
val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)

/**
* Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool.
*
* When acquiring memory for a task, the execution pool may need to make multiple
* attempts. Each attempt must be able to evict storage in case another task jumps in
* and caches a large block between the attempts. This is called once per attempt.
*/
def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
if (extraMemoryNeeded > 0) {
// There is not enough free memory in the execution pool, so try to reclaim memory from
// storage. We can reclaim any free memory from the storage pool. If the storage pool
// has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
// the memory that storage has borrowed from execution.
val memoryReclaimableFromStorage =
math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
if (memoryReclaimableFromStorage > 0) {
// Only reclaim as much space as is necessary and available:
val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
}
}
}
onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)

/**
* The size the execution pool would have after evicting storage memory.
*
* The execution memory pool divides this quantity among the active tasks evenly to cap
* the execution memory allocation for each task. It is important to keep this greater
* than the execution pool size, which doesn't take into account potential memory that
* could be freed by evicting storage. Otherwise we may hit SPARK-12155.
*
* Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness
* in execution memory allocation across tasks, Otherwise, a task may occupy more than
* its fair share of execution memory, mistakenly thinking that other tasks can acquire
* the portion of storage memory that cannot be evicted.
*/
def computeMaxExecutionPoolSize(): Long = {
maxMemory - math.min(storageMemoryUsed, storageRegionSize)
}

onHeapExecutionMemoryPool.acquireMemory(
numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)

case MemoryMode.OFF_HEAP =>
// For now, we only support on-heap caching of data, so we do not need to interact with
// the storage pool when allocating off-heap memory. This will change in the future, though.
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ private[spark] abstract class Task[T](
Utils.tryLogNonFatalError {
// Release memory used by this thread for unrolling blocks
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
// Notify any tasks waiting for execution memory to be freed to wake up and try to
// acquire memory again. This makes impossible the scenario where a task sleeps forever
// because there are no other tasks left to notify it. Since this is safe to do but may
// not be strictly necessary, we should revisit whether we can remove this in the future.
val memoryManager = SparkEnv.get.memoryManager
memoryManager.synchronized { memoryManager.notifyAll() }
}
} finally {
TaskContext.unset()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,4 +230,29 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
assert(exception.getMessage.contains("larger heap size"))
}

test("execution can evict cached blocks when there are multiple active tasks (SPARK-12155)") {
val conf = new SparkConf()
.set("spark.memory.fraction", "1")
.set("spark.memory.storageFraction", "0")
.set("spark.testing.memory", "1000")
val mm = UnifiedMemoryManager(conf, numCores = 2)
val ms = makeMemoryStore(mm)
assert(mm.maxMemory === 1000)
// Have two tasks each acquire some execution memory so that the memory pool registers that
// there are two active tasks:
assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L)
assert(mm.acquireExecutionMemory(100L, 1, MemoryMode.ON_HEAP) === 100L)
// Fill up all of the remaining memory with storage.
assert(mm.acquireStorageMemory(dummyBlock, 800L, evictedBlocks))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 800)
assert(mm.executionMemoryUsed === 200)
// A task should still be able to allocate 100 bytes execution memory by evicting blocks
assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L)
assertEvictBlocksToFreeSpaceCalled(ms, 100L)
assert(mm.executionMemoryUsed === 300)
assert(mm.storageMemoryUsed === 700)
assert(evictedBlocks.nonEmpty)
}

}