Skip to content

[SPARK-4480] Avoid many small spills in external data structures (1.1) #3354

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.shuffle

import scala.collection.mutable

import org.apache.spark.{Logging, SparkException, SparkConf}
import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}

/**
* Allocates a pool of memory to task threads for use in shuffle operations. Each disk-spilling
Expand Down Expand Up @@ -111,7 +111,7 @@ private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging {
}
}

private object ShuffleMemoryManager {
private[spark] object ShuffleMemoryManager {
/**
* Figure out the shuffle memory limit from a SparkConf. We currently have both a fraction
* of the memory pool and a safety factor since collections can sometimes grow bigger than
Expand All @@ -122,4 +122,7 @@ private object ShuffleMemoryManager {
val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
}

// Initial threshold for the size of a collection before we start tracking its memory usage
val DEFAULT_INITIAL_MEMORY_THRESHOLD: Long = 5 * 1024 * 1024
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import com.google.common.io.ByteStreams

import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.serializer.{DeserializationStream, Serializer}
import org.apache.spark.shuffle.ShuffleMemoryManager
import org.apache.spark.storage.{BlockId, BlockManager}
import org.apache.spark.util.collection.ExternalAppendOnlyMap.HashComparator
import org.apache.spark.executor.ShuffleWriteMetrics

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -81,8 +82,14 @@ class ExternalAppendOnlyMap[K, V, C](
// Number of in-memory pairs inserted before tracking the map's shuffle memory usage
private val trackMemoryThreshold = 1000

// How much of the shared memory pool this collection has claimed
private var myMemoryThreshold = 0L
// Initial threshold for the size of a collection before we start tracking its memory usage
private val initialMemoryThreshold =
SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold",
ShuffleMemoryManager.DEFAULT_INITIAL_MEMORY_THRESHOLD)

// Threshold for the collection's size in bytes before we start tracking its memory usage
// To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
private var myMemoryThreshold = initialMemoryThreshold

/**
* Size of object batches when reading/writing from serializers.
Expand Down Expand Up @@ -235,8 +242,11 @@ class ExternalAppendOnlyMap[K, V, C](
spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes))

// Release our memory back to the shuffle pool so that other threads can grab it
shuffleMemoryManager.release(myMemoryThreshold)
myMemoryThreshold = 0L
// The amount we requested does not include the initial memory tracking threshold
shuffleMemoryManager.release(myMemoryThreshold - initialMemoryThreshold)

// Reset this to the initial threshold to avoid spilling many small files
myMemoryThreshold = initialMemoryThreshold

elementsRead = 0
_memoryBytesSpilled += mapSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.google.common.io.ByteStreams
import org.apache.spark._
import org.apache.spark.serializer.{DeserializationStream, Serializer}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.shuffle.ShuffleMemoryManager
import org.apache.spark.storage.{BlockObjectWriter, BlockId}

/**
Expand Down Expand Up @@ -134,8 +135,14 @@ private[spark] class ExternalSorter[K, V, C](
// Write metrics for current spill
private var curWriteMetrics: ShuffleWriteMetrics = _

// How much of the shared memory pool this collection has claimed
private var myMemoryThreshold = 0L
// Initial threshold for the size of a collection before we start tracking its memory usage
private val initialMemoryThreshold =
SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold",
ShuffleMemoryManager.DEFAULT_INITIAL_MEMORY_THRESHOLD)

// Threshold for the collection's size in bytes before we start tracking its memory usage
// To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
private var myMemoryThreshold = initialMemoryThreshold

// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't need
// local aggregation and sorting, write numPartitions files directly and just concatenate them
Expand Down Expand Up @@ -284,8 +291,11 @@ private[spark] class ExternalSorter[K, V, C](
}

// Release our memory back to the shuffle pool so that other threads can grab it
shuffleMemoryManager.release(myMemoryThreshold)
myMemoryThreshold = 0
// The amount we requested does not include the initial memory tracking threshold
shuffleMemoryManager.release(myMemoryThreshold - initialMemoryThreshold)

// Reset this to the initial threshold to avoid spilling many small files
myMemoryThreshold = initialMemoryThreshold

_memoryBytesSpilled += memorySize
elementsRead = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
test("empty partitions with spilling") {
val conf = createSparkConf(false)
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)

Expand All @@ -152,6 +153,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
test("empty partitions with spilling, bypass merge-sort") {
val conf = createSparkConf(false)
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)

Expand Down Expand Up @@ -761,5 +763,5 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
}

sorter2.stop()
}
}
}