Skip to content

[SPARK-4452][SPARK-11293][Core][BRANCH-1.6] Shuffle data structures can starve others on the same thread for memory #13027

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ protected MemoryConsumer(TaskMemoryManager taskMemoryManager) {
/**
* Returns the size of used memory in bytes.
*/
long getUsed() {
protected long getUsed() {
return used;
}

Expand Down Expand Up @@ -130,4 +130,21 @@ protected void freePage(MemoryBlock page) {
used -= page.size();
taskMemoryManager.freePage(page, this);
}

/**
* Allocates a heap memory of `size`.
*/
public long acquireOnHeapMemory(long size) {
long granted = taskMemoryManager.acquireExecutionMemory(size, MemoryMode.ON_HEAP, this);
used += granted;
return granted;
}

/**
* Release N bytes of heap memory.
*/
public void freeOnHeapMemory(long size) {
taskMemoryManager.releaseExecutionMemory(size, MemoryMode.ON_HEAP, this);
used -= size;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -408,4 +408,11 @@ public long cleanUpAllAllocatedMemory() {
public long getMemoryConsumptionForThisTask() {
return memoryManager.getExecutionMemoryUsageForTask(taskAttemptId);
}

/**
* Returns Tungsten memory mode
*/
public MemoryMode getTungstenMemoryMode() {
return tungstenMemoryMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ class ExternalAppendOnlyMap[K, V, C](
serializer: Serializer = SparkEnv.get.serializer,
blockManager: BlockManager = SparkEnv.get.blockManager,
context: TaskContext = TaskContext.get())
extends Iterable[(K, C)]
extends Spillable[SizeTracker](context.taskMemoryManager())
with Serializable
with Logging
with Spillable[SizeTracker] {
with Iterable[(K, C)] {

if (context == null) {
throw new IllegalStateException(
Expand All @@ -79,9 +79,7 @@ class ExternalAppendOnlyMap[K, V, C](
this(createCombiner, mergeValue, mergeCombiners, serializer, blockManager, TaskContext.get())
}

override protected[this] def taskMemoryManager: TaskMemoryManager = context.taskMemoryManager()

private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
@volatile private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
private val spilledMaps = new ArrayBuffer[DiskMapIterator]
private val sparkConf = SparkEnv.get.conf
private val diskBlockManager = blockManager.diskBlockManager
Expand Down Expand Up @@ -115,6 +113,8 @@ class ExternalAppendOnlyMap[K, V, C](
private val keyComparator = new HashComparator[K]
private val ser = serializer.newInstance()

@volatile private var readingIterator: SpillableIterator = null

/**
* Number of files this map has spilled so far.
* Exposed for testing.
Expand Down Expand Up @@ -180,6 +180,29 @@ class ExternalAppendOnlyMap[K, V, C](
* Sort the existing contents of the in-memory map and spill them to a temporary file on disk.
*/
override protected[this] def spill(collection: SizeTracker): Unit = {
val inMemoryIterator = currentMap.destructiveSortedIterator(keyComparator)
val diskMapIterator = spillMemoryIteratorToDisk(inMemoryIterator)
spilledMaps.append(diskMapIterator)
}

/**
* Force to spilling the current in-memory collection to disk to release memory,
* It will be called by TaskMemoryManager when there is not enough memory for the task.
*/
override protected[this] def forceSpill(): Boolean = {
assert(readingIterator != null)
val isSpilled = readingIterator.spill()
if (isSpilled) {
currentMap = null
}
isSpilled
}

/**
* Spill the in-memory Iterator to a temporary file on disk.
*/
private[this] def spillMemoryIteratorToDisk(inMemoryIterator: Iterator[(K, C)])
: DiskMapIterator = {
val (blockId, file) = diskBlockManager.createTempLocalBlock()
curWriteMetrics = new ShuffleWriteMetrics()
var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics)
Expand All @@ -200,9 +223,8 @@ class ExternalAppendOnlyMap[K, V, C](

var success = false
try {
val it = currentMap.destructiveSortedIterator(keyComparator)
while (it.hasNext) {
val kv = it.next()
while (inMemoryIterator.hasNext) {
val kv = inMemoryIterator.next()
writer.write(kv._1, kv._2)
objectsWritten += 1

Expand Down Expand Up @@ -235,7 +257,17 @@ class ExternalAppendOnlyMap[K, V, C](
}
}

spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes))
new DiskMapIterator(file, blockId, batchSizes)
}

/**
* Returns a destructive iterator for iterating over the entries of this map.
* If this iterator is forced spill to disk to release memory when there is not enough memory,
* it returns pairs from an on-disk map.
*/
def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): Iterator[(K, C)] = {
readingIterator = new SpillableIterator(inMemoryIterator)
readingIterator
}

/**
Expand All @@ -248,15 +280,18 @@ class ExternalAppendOnlyMap[K, V, C](
"ExternalAppendOnlyMap.iterator is destructive and should only be called once.")
}
if (spilledMaps.isEmpty) {
CompletionIterator[(K, C), Iterator[(K, C)]](currentMap.iterator, freeCurrentMap())
CompletionIterator[(K, C), Iterator[(K, C)]](
destructiveIterator(currentMap.iterator), freeCurrentMap())
} else {
new ExternalIterator()
}
}

private def freeCurrentMap(): Unit = {
currentMap = null // So that the memory can be garbage-collected
releaseMemory()
if (currentMap != null) {
currentMap = null // So that the memory can be garbage-collected
releaseMemory()
}
}

/**
Expand All @@ -270,8 +305,8 @@ class ExternalAppendOnlyMap[K, V, C](

// Input streams are derived both from the in-memory map and spilled maps on disk
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]](
currentMap.destructiveSortedIterator(keyComparator), freeCurrentMap())
private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]](destructiveIterator(
currentMap.destructiveSortedIterator(keyComparator)), freeCurrentMap())
private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)

inputStreams.foreach { it =>
Expand Down Expand Up @@ -530,8 +565,56 @@ class ExternalAppendOnlyMap[K, V, C](
context.addTaskCompletionListener(context => cleanup())
}

private[this] class SpillableIterator(var upstream: Iterator[(K, C)])
extends Iterator[(K, C)] {

private val SPILL_LOCK = new Object()

private var nextUpstream: Iterator[(K, C)] = null

private var cur: (K, C) = readNext()

private var hasSpilled: Boolean = false

def spill(): Boolean = SPILL_LOCK.synchronized {
if (hasSpilled) {
false
} else {
logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " +
s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
nextUpstream = spillMemoryIteratorToDisk(upstream)
hasSpilled = true
true
}
}

def readNext(): (K, C) = SPILL_LOCK.synchronized {
if (nextUpstream != null) {
upstream = nextUpstream
nextUpstream = null
}
if (upstream.hasNext) {
upstream.next()
} else {
null
}
}

override def hasNext(): Boolean = cur != null

override def next(): (K, C) = {
val r = cur
cur = readNext()
r
}
}

/** Convenience function to hash the given (K, C) pair by the key. */
private def hashKey(kc: (K, C)): Int = ExternalAppendOnlyMap.hash(kc._1)

override def toString(): String = {
this.getClass.getName + "@" + java.lang.Integer.toHexString(this.hashCode())
}
}

private[spark] object ExternalAppendOnlyMap {
Expand Down
Loading