Skip to content

[SPARK-7214] Reserve space for unrolling even when MemoryStore nearly full #5784

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 38 additions & 15 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
var vector = new SizeTrackingVector[Any]

// Request enough memory to begin unrolling
keepUnrolling = reserveUnrollMemoryForThisThread(initialMemoryThreshold)
val initialReserveResult =
reserveUnrollMemoryForThisThreadDroppingBlocks(blockId, initialMemoryThreshold)
droppedBlocks ++= initialReserveResult.droppedBlocks
keepUnrolling = initialReserveResult.success

if (!keepUnrolling) {
logWarning(s"Failed to reserve initial memory threshold of " +
Expand All @@ -280,20 +283,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val currentSize = vector.estimateSize()
if (currentSize >= memoryThreshold) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this comment into the new method

val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
// Hold the accounting lock, in case another thread concurrently puts a block that
// takes up the unrolling space we just ensured here
accountingLock.synchronized {
if (!reserveUnrollMemoryForThisThread(amountToRequest)) {
// If the first request is not granted, try again after ensuring free space
// If there is still not enough space, give up and drop the partition
val spaceToEnsure = maxUnrollMemory - currentUnrollMemory
if (spaceToEnsure > 0) {
val result = ensureFreeSpace(blockId, spaceToEnsure)
droppedBlocks ++= result.droppedBlocks
}
keepUnrolling = reserveUnrollMemoryForThisThread(amountToRequest)
}
}
val reserveResult =
reserveUnrollMemoryForThisThreadDroppingBlocks(blockId, amountToRequest)
droppedBlocks ++= reserveResult.droppedBlocks
keepUnrolling = reserveResult.success
// New threshold is currentSize * memoryGrowthFactor
memoryThreshold += amountToRequest
}
Expand Down Expand Up @@ -497,6 +490,36 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}

/**
* Reserve additional memory for unrolling blocks used by this thread, evicting blocks if
* necessary. If blocks are dropped, adds them to droppedBlocks. Returns whether the
* request was granted, and any blocks that were dropped trying to grant it.
*/
private def reserveUnrollMemoryForThisThreadDroppingBlocks(
blockToAdd: BlockId,
space: Long): ResultWithDroppedBlocks = {
var droppedBlocks = Seq.empty[(BlockId, BlockStatus)]
var success = true
// Hold the accounting lock, in case another thread concurrently puts a block that
// takes up the unrolling space we just ensured here
accountingLock.synchronized {
if (!reserveUnrollMemoryForThisThread(space)) {
logInfo(s"Initial reserveUnrollMemoryForThisThread($space) failed")
// If the first request is not granted, try again after ensuring free space
// If there is still not enough space, give up and drop the partition
val spaceToEnsure = maxUnrollMemory - currentUnrollMemory
if (spaceToEnsure > 0) {
val result = ensureFreeSpace(blockToAdd, spaceToEnsure)
droppedBlocks = result.droppedBlocks
}
success = reserveUnrollMemoryForThisThread(space)
logInfo(s"Second reserveUnrollMemoryForThisThread: $success")
}
}
ResultWithDroppedBlocks(success, droppedBlocks)
}


/**
* Release memory used by this thread for unrolling blocks.
* If the amount is not specified, remove the current thread's allocation altogether.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,72 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
}

/**
* Make sure unrolling triggers block evictions when the initial unroll threshold is not available
* otherwise.
*/
test("unroll when memory store nearly full") {
store = makeBlockManager(12000)
val memOnly = StorageLevel.MEMORY_ONLY
val memoryStore = store.memoryStore
// Should take up 1145 * 10 + object headers > 12000 - unrollMemoryThreshold (512)
val mostList = List.fill(10)(new Array[Byte](1140))
val smallList = List.fill(40)(new Array[Byte](100))
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

store.putIterator("most", mostList.iterator, memOnly)
assert(memoryStore.contains("most"))

val unrollResult = memoryStore.unrollSafely("small", smallList.iterator, droppedBlocks)
verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
assert(memoryStore.currentUnrollMemoryForThisThread === 0)
assert(droppedBlocks.size === 1)
assert(droppedBlocks.head._1 === TestBlockId("most"))
droppedBlocks.clear()
memoryStore.releasePendingUnrollMemoryForThisThread()
}

test("unroll when memory store nearly full through putIterator") {
store = makeBlockManager(12000)
val memOnly = StorageLevel.MEMORY_ONLY
val memoryStore = store.memoryStore
val mostList = List.fill(10)(new Array[Byte](1140))
val smallList = List.fill(40)(new Array[Byte](100))

// Fill almost the entire block manager
store.putIterator("most", mostList.iterator, memOnly)
assert(memoryStore.contains("most"))

// Attempt to compute a modestly size block; should evict most.
val smallResult = memoryStore.putIterator(
"small", smallList.iterator, memOnly, returnValues = true)
assert(memoryStore.contains("small"))
assert(smallResult.data.isLeft)
assert(smallResult.droppedBlocks.size === 1)
assert(smallResult.droppedBlocks.head._1 === TestBlockId("most"))
}

test("unroll fails when memory store nearly full with same RDD") {
store = makeBlockManager(12000)
val memOnly = StorageLevel.MEMORY_ONLY
val memoryStore = store.memoryStore
val mostList = List.fill(10)(new Array[Byte](1140))
val smallList = List.fill(40)(new Array[Byte](100))

// Fill almost the entire block manager with split 1
store.putIterator(rdd(1, 1), mostList.iterator, memOnly)
assert(memoryStore.contains(rdd(1, 1)))

// Attempt to compute a modestly sized split 2, should not be allowed to evict
// split1
val smallResult = memoryStore.putIterator(
rdd(1, 2), smallList.iterator, memOnly, returnValues = true)
assert(memoryStore.contains(rdd(1, 1)))
assert(!memoryStore.contains(rdd(1, 2)))
assert(smallResult.droppedBlocks.size === 0)
assert(smallResult.data.isLeft)
}

test("lazily create a big ByteBuffer to avoid OOM if it cannot be put into MemoryStore") {
store = makeBlockManager(12000)
val memoryStore = store.memoryStore
Expand Down