Skip to content

Commit 16bf5f3

Browse files
author
Andrew Or
committed
[SPARK-4480] Avoid many small spills in external data structures (1.1)
This is the branch-1.1 version of #3353. This requires a separate PR because the code in master has been refactored a little to eliminate duplicate code. I have tested this on a standalone cluster. The goal is to merge this into 1.1.1. Author: Andrew Or <andrew@databricks.com> Closes #3354 from andrewor14/avoid-small-spills-1.1 and squashes the following commits: f2e552c [Andrew Or] Fix tests 7012595 [Andrew Or] Avoid many small spills
1 parent e22a759 commit 16bf5f3

File tree

4 files changed

+37
-12
lines changed

4 files changed

+37
-12
lines changed

core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.shuffle
1919

2020
import scala.collection.mutable
2121

22-
import org.apache.spark.{Logging, SparkException, SparkConf}
22+
import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
2323

2424
/**
2525
* Allocates a pool of memory to task threads for use in shuffle operations. Each disk-spilling
@@ -111,7 +111,7 @@ private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging {
111111
}
112112
}
113113

114-
private object ShuffleMemoryManager {
114+
private[spark] object ShuffleMemoryManager {
115115
/**
116116
* Figure out the shuffle memory limit from a SparkConf. We currently have both a fraction
117117
* of the memory pool and a safety factor since collections can sometimes grow bigger than
@@ -122,4 +122,7 @@ private object ShuffleMemoryManager {
122122
val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
123123
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
124124
}
125+
126+
// Initial threshold for the size of a collection before we start tracking its memory usage
127+
val DEFAULT_INITIAL_MEMORY_THRESHOLD: Long = 5 * 1024 * 1024
125128
}

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ import com.google.common.io.ByteStreams
2828

2929
import org.apache.spark.{Logging, SparkEnv}
3030
import org.apache.spark.annotation.DeveloperApi
31+
import org.apache.spark.executor.ShuffleWriteMetrics
3132
import org.apache.spark.serializer.{DeserializationStream, Serializer}
33+
import org.apache.spark.shuffle.ShuffleMemoryManager
3234
import org.apache.spark.storage.{BlockId, BlockManager}
3335
import org.apache.spark.util.collection.ExternalAppendOnlyMap.HashComparator
34-
import org.apache.spark.executor.ShuffleWriteMetrics
3536

3637
/**
3738
* :: DeveloperApi ::
@@ -81,8 +82,14 @@ class ExternalAppendOnlyMap[K, V, C](
8182
// Number of in-memory pairs inserted before tracking the map's shuffle memory usage
8283
private val trackMemoryThreshold = 1000
8384

84-
// How much of the shared memory pool this collection has claimed
85-
private var myMemoryThreshold = 0L
85+
// Initial threshold for the size of a collection before we start tracking its memory usage
86+
private val initialMemoryThreshold =
87+
SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold",
88+
ShuffleMemoryManager.DEFAULT_INITIAL_MEMORY_THRESHOLD)
89+
90+
// Threshold for the collection's size in bytes before we start tracking its memory usage
91+
// To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
92+
private var myMemoryThreshold = initialMemoryThreshold
8693

8794
/**
8895
* Size of object batches when reading/writing from serializers.
@@ -236,8 +243,11 @@ class ExternalAppendOnlyMap[K, V, C](
236243
spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes))
237244

238245
// Release our memory back to the shuffle pool so that other threads can grab it
239-
shuffleMemoryManager.release(myMemoryThreshold)
240-
myMemoryThreshold = 0L
246+
// The amount we requested does not include the initial memory tracking threshold
247+
shuffleMemoryManager.release(myMemoryThreshold - initialMemoryThreshold)
248+
249+
// Reset this to the initial threshold to avoid spilling many small files
250+
myMemoryThreshold = initialMemoryThreshold
241251

242252
elementsRead = 0
243253
_memoryBytesSpilled += mapSize

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import com.google.common.io.ByteStreams
2828
import org.apache.spark._
2929
import org.apache.spark.serializer.{DeserializationStream, Serializer}
3030
import org.apache.spark.executor.ShuffleWriteMetrics
31+
import org.apache.spark.shuffle.ShuffleMemoryManager
3132
import org.apache.spark.storage.{BlockObjectWriter, BlockId}
3233

3334
/**
@@ -134,8 +135,14 @@ private[spark] class ExternalSorter[K, V, C](
134135
// Write metrics for current spill
135136
private var curWriteMetrics: ShuffleWriteMetrics = _
136137

137-
// How much of the shared memory pool this collection has claimed
138-
private var myMemoryThreshold = 0L
138+
// Initial threshold for the size of a collection before we start tracking its memory usage
139+
private val initialMemoryThreshold =
140+
SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold",
141+
ShuffleMemoryManager.DEFAULT_INITIAL_MEMORY_THRESHOLD)
142+
143+
// Threshold for the collection's size in bytes before we start tracking its memory usage
144+
// To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
145+
private var myMemoryThreshold = initialMemoryThreshold
139146

140147
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't need
141148
// local aggregation and sorting, write numPartitions files directly and just concatenate them
@@ -285,8 +292,11 @@ private[spark] class ExternalSorter[K, V, C](
285292
}
286293

287294
// Release our memory back to the shuffle pool so that other threads can grab it
288-
shuffleMemoryManager.release(myMemoryThreshold)
289-
myMemoryThreshold = 0
295+
// The amount we requested does not include the initial memory tracking threshold
296+
shuffleMemoryManager.release(myMemoryThreshold - initialMemoryThreshold)
297+
298+
// Reset this to the initial threshold to avoid spilling many small files
299+
myMemoryThreshold = initialMemoryThreshold
290300

291301
_memoryBytesSpilled += memorySize
292302
elementsRead = 0

core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
127127
test("empty partitions with spilling") {
128128
val conf = createSparkConf(false)
129129
conf.set("spark.shuffle.memoryFraction", "0.001")
130+
conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
130131
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
131132
sc = new SparkContext("local", "test", conf)
132133

@@ -152,6 +153,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
152153
test("empty partitions with spilling, bypass merge-sort") {
153154
val conf = createSparkConf(false)
154155
conf.set("spark.shuffle.memoryFraction", "0.001")
156+
conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
155157
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
156158
sc = new SparkContext("local", "test", conf)
157159

@@ -761,5 +763,5 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
761763
}
762764

763765
sorter2.stop()
764-
}
766+
}
765767
}

0 commit comments

Comments
 (0)