Skip to content

Commit 5cea859

Browse files
mccheahAndrew Or
authored and
Andrew Or
committed
[SPARK-4808] Removing minimum number of elements read before spill check
In the general case, Spillable's heuristic of checking for memory stress on every 32nd item after 1000 items are read is good enough. In general, we do not want to be enacting the spilling checks until later on in the job; checking for disk-spilling too early can produce unacceptable performance impact in trivial cases. However, there are non-trivial cases, particularly if each serialized object is large, where checking for the necessity to spill too late would allow the memory to overflow. Consider if every item is 1.5 MB in size, and the heap size is 1000 MB. Then clearly if we only try to spill the in-memory contents to disk after 1000 items are read, we would have already accumulated 1500 MB of RAM and overflowed the heap. Patch #3656 attempted to circumvent this by checking the need to spill on every single item read, but that would cause unacceptable performance in the general case. However, the convoluted cases above should not be forced to be refactored to shrink the data items. Therefore it makes sense that the memory spilling thresholds be configurable. Author: mcheah <mcheah@palantir.com> Closes #4420 from mingyukim/memory-spill-configurable and squashes the following commits: 6e2509f [mcheah] [SPARK-4808] Removing minimum number of elements read before spill check
1 parent 18fbed5 commit 5cea859

File tree

1 file changed

+1
-5
lines changed

1 file changed

+1
-5
lines changed

core/src/main/scala/org/apache/spark/util/collection/Spillable.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,6 @@ private[spark] trait Spillable[C] extends Logging {
4242
// Memory manager that can be used to acquire/release memory
4343
private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager
4444

45-
// Threshold for `elementsRead` before we start tracking this collection's memory usage
46-
private[this] val trackMemoryThreshold = 1000
47-
4845
// Initial threshold for the size of a collection before we start tracking its memory usage
4946
// Exposed for testing
5047
private[this] val initialMemoryThreshold: Long =
@@ -72,8 +69,7 @@ private[spark] trait Spillable[C] extends Logging {
7269
* @return true if `collection` was spilled to disk; false otherwise
7370
*/
7471
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
75-
if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 &&
76-
currentMemory >= myMemoryThreshold) {
72+
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
7773
// Claim up to double our current memory from the shuffle memory pool
7874
val amountToRequest = 2 * currentMemory - myMemoryThreshold
7975
val granted = shuffleMemoryManager.tryToAcquire(amountToRequest)

0 commit comments

Comments
 (0)