Skip to content

Clarify the transfer of memory from unroll to pending unroll #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 15 additions & 25 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
// Otherwise, if we return an iterator, we release the memory reserved here
// later when the task finishes.
if (keepUnrolling) {
val taskAttemptId = currentTaskAttemptId()
accountingLock.synchronized {
val amountToRelease = currentUnrollMemoryForThisTask - previousMemoryReserved
releaseUnrollMemoryForThisTask(amountToRelease)
reservePendingUnrollMemoryForThisTask(blockId, amountToRelease, droppedBlocks)
// Here, we transfer memory from unroll to pending unroll because we expect to cache this
// block in `tryToPut`. We do not release and re-acquire memory from the MemoryManager in
// order to avoid race conditions where another component steals the memory that we're
// trying to transfer.
val amountToTransferToPending = currentUnrollMemoryForThisTask - previousMemoryReserved
unrollMemoryMap(taskAttemptId) -= amountToTransferToPending
pendingUnrollMemoryMap(taskAttemptId) =
pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + amountToTransferToPending
}
}
}
Expand Down Expand Up @@ -362,7 +368,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
// Note: if we have previously unrolled this block successfully, then pending unroll
// memory should be non-zero. This is the amount that we already reserved during the
// unrolling process. In this case, we can just reuse this space to cache our block.
// This must be synchronized so the release and re-acquire can happen atomically.
//
// Note: the StaticMemoryManager counts unroll memory as storage memory. Here, the
// synchronization on `accountingLock` guarantees that the release of unroll memory and
// acquisition of storage memory happens atomically. However, if storage memory is acquired
// outside of MemoryStore or if unroll memory is counted as execution memory, then we will
// have to revisit this assumption. See SPARK-10983 for more context.
releasePendingUnrollMemoryForThisTask()
val numBytesAcquired = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks)
val enoughMemory = numBytesAcquired == size
Expand Down Expand Up @@ -516,27 +527,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
}

/**
* Reserve the unroll memory of current unroll successful block used by this task
* until actually put the block into memory entry.
* @return whether the request is granted.
*/
private def reservePendingUnrollMemoryForThisTask(
blockId: BlockId,
memory: Long,
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
val taskAttemptId = currentTaskAttemptId()
accountingLock.synchronized {
val acquired = memoryManager.acquireUnrollMemory(blockId, memory, droppedBlocks)
val success = acquired == memory
if (success) {
pendingUnrollMemoryMap(taskAttemptId) =
pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory
}
success
}
}

/**
* Release pending unroll memory of current unroll successful block used by this task
*/
Expand Down