Skip to content

[SPARK-10983] Unified memory manager #9084

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -418,16 +418,35 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}

// Validate memory fractions
val memoryKeys = Seq(
val deprecatedMemoryKeys = Seq(
"spark.storage.memoryFraction",
"spark.shuffle.memoryFraction",
"spark.shuffle.safetyFraction",
"spark.storage.unrollFraction",
"spark.storage.safetyFraction")
val memoryKeys = Seq(
"spark.memory.fraction",
"spark.memory.storageFraction") ++
deprecatedMemoryKeys
for (key <- memoryKeys) {
val value = getDouble(key, 0.5)
if (value > 1 || value < 0) {
throw new IllegalArgumentException("$key should be between 0 and 1 (was '$value').")
throw new IllegalArgumentException(s"$key should be between 0 and 1 (was '$value').")
}
}

// Warn against deprecated memory fractions (unless legacy memory management mode is enabled)
val legacyMemoryManagementKey = "spark.memory.useLegacyMode"
val legacyMemoryManagement = getBoolean(legacyMemoryManagementKey, false)
if (!legacyMemoryManagement) {
val keyset = deprecatedMemoryKeys.toSet
val detected = settings.keys().asScala.filter(keyset.contains)
if (detected.nonEmpty) {
logWarning("Detected deprecated memory fraction settings: " +
detected.mkString("[", ", ", "]") + ". As of Spark 1.6, execution and storage " +
"memory management are unified. All memory fractions used in the old model are " +
"now deprecated and no longer read. If you wish to use the old memory management, " +
s"you may explicitly enable `$legacyMemoryManagementKey` (not recommended).")
}
}

Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.memory.{MemoryManager, StaticMemoryManager}
import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager}
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.{RpcEndpointRef, RpcEndpoint, RpcEnv}
Expand Down Expand Up @@ -335,7 +335,14 @@ object SparkEnv extends Logging {
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

val memoryManager = new StaticMemoryManager(conf)
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf)
} else {
new UnifiedMemoryManager(conf)
}

val shuffleMemoryManager = ShuffleMemoryManager.create(conf, memoryManager, numUsableCores)

val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)
Expand Down
83 changes: 62 additions & 21 deletions core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.memory

import scala.collection.mutable

import org.apache.spark.Logging
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}


Expand All @@ -29,7 +30,7 @@ import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}
* sorts and aggregations, while storage memory refers to that used for caching and propagating
* internal data across the cluster. There exists one of these per JVM.
*/
private[spark] abstract class MemoryManager {
private[spark] abstract class MemoryManager extends Logging {

// The memory store used to evict cached blocks
private var _memoryStore: MemoryStore = _
Expand All @@ -40,19 +41,38 @@ private[spark] abstract class MemoryManager {
_memoryStore
}

// Amount of execution/storage memory in use, accesses must be synchronized on `this`
protected var _executionMemoryUsed: Long = 0
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why here is protected?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pzz2011 you're making several comments on old PRs. Generally people won't see that and it's not the place for discussion anyway. If you can formulate a specific question beyond "why is the code this way?" ask on user@.

protected var _storageMemoryUsed: Long = 0

/**
* Set the [[MemoryStore]] used by this manager to evict cached blocks.
* This must be set after construction due to initialization ordering constraints.
*/
def setMemoryStore(store: MemoryStore): Unit = {
final def setMemoryStore(store: MemoryStore): Unit = {
_memoryStore = store
}

/**
* Acquire N bytes of memory for execution.
* Total available memory for execution, in bytes.
*/
def maxExecutionMemory: Long

/**
* Total available memory for storage, in bytes.
*/
def maxStorageMemory: Long

// TODO: avoid passing evicted blocks around to simplify method signatures (SPARK-10985)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call on deferring this to a followup task.


/**
* Acquire N bytes of memory for execution, evicting cached blocks if necessary.
* Blocks evicted in the process, if any, are added to `evictedBlocks`.
* @return number of bytes successfully granted (<= N).
*/
def acquireExecutionMemory(numBytes: Long): Long
def acquireExecutionMemory(
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long

/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
Expand All @@ -66,52 +86,73 @@ private[spark] abstract class MemoryManager {

/**
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
*
* This extra method allows subclasses to differentiate behavior between acquiring storage
* memory and acquiring unroll memory. For instance, the memory management model in Spark
* 1.5 and before places a limit on the amount of space that can be freed from unrolling.
* Blocks evicted in the process, if any, are added to `evictedBlocks`.
*
* @return whether all N bytes were successfully granted.
*/
def acquireUnrollMemory(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that acquireUnrollMemory appears to act as a synonym for acquireStorageMemory in the current implementation, it might be worth adding a brief comment above this method to explain that this extra method exists in order to give us the future flexibility to account for unroll memory differently in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, it's more than a synonym. In StaticMemoryManager it's required to preserve existing behavior where unrolling doesn't evict all the blocks.

blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
acquireStorageMemory(blockId, numBytes, evictedBlocks)
}

/**
* Release N bytes of execution memory.
*/
def releaseExecutionMemory(numBytes: Long): Unit
def releaseExecutionMemory(numBytes: Long): Unit = synchronized {
if (numBytes > _executionMemoryUsed) {
logWarning(s"Attempted to release $numBytes bytes of execution " +
s"memory when we only have ${_executionMemoryUsed} bytes")
_executionMemoryUsed = 0
} else {
_executionMemoryUsed -= numBytes
}
}

/**
* Release N bytes of storage memory.
*/
def releaseStorageMemory(numBytes: Long): Unit
def releaseStorageMemory(numBytes: Long): Unit = synchronized {
if (numBytes > _storageMemoryUsed) {
logWarning(s"Attempted to release $numBytes bytes of storage " +
s"memory when we only have ${_storageMemoryUsed} bytes")
_storageMemoryUsed = 0
} else {
_storageMemoryUsed -= numBytes
}
}

/**
* Release all storage memory acquired.
*/
def releaseStorageMemory(): Unit
def releaseAllStorageMemory(): Unit = synchronized {
_storageMemoryUsed = 0
}

/**
* Release N bytes of unroll memory.
*/
def releaseUnrollMemory(numBytes: Long): Unit

/**
* Total available memory for execution, in bytes.
*/
def maxExecutionMemory: Long

/**
* Total available memory for storage, in bytes.
*/
def maxStorageMemory: Long
def releaseUnrollMemory(numBytes: Long): Unit = synchronized {
releaseStorageMemory(numBytes)
}

/**
* Execution memory currently in use, in bytes.
*/
def executionMemoryUsed: Long
final def executionMemoryUsed: Long = synchronized {
_executionMemoryUsed
}

/**
* Storage memory currently in use, in bytes.
*/
def storageMemoryUsed: Long
final def storageMemoryUsed: Long = synchronized {
_storageMemoryUsed
}

}
105 changes: 25 additions & 80 deletions core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.memory

import scala.collection.mutable

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.SparkConf
import org.apache.spark.storage.{BlockId, BlockStatus}


Expand All @@ -34,17 +34,7 @@ private[spark] class StaticMemoryManager(
conf: SparkConf,
override val maxExecutionMemory: Long,
override val maxStorageMemory: Long)
extends MemoryManager with Logging {

// Max number of bytes worth of blocks to evict when unrolling
private val maxMemoryToEvictForUnroll: Long = {
(maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}

// Amount of execution / storage memory in use
// Accesses must be synchronized on `this`
private var _executionMemoryUsed: Long = 0
private var _storageMemoryUsed: Long = 0
extends MemoryManager {

def this(conf: SparkConf) {
this(
Expand All @@ -53,11 +43,19 @@ private[spark] class StaticMemoryManager(
StaticMemoryManager.getMaxStorageMemory(conf))
}

// Max number of bytes worth of blocks to evict when unrolling
private val maxMemoryToEvictForUnroll: Long = {
(maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}

/**
* Acquire N bytes of memory for execution.
* @return number of bytes successfully granted (<= N).
*/
override def acquireExecutionMemory(numBytes: Long): Long = synchronized {
override def acquireExecutionMemory(
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = synchronized {
assert(numBytes >= 0)
assert(_executionMemoryUsed <= maxExecutionMemory)
val bytesToGrant = math.min(numBytes, maxExecutionMemory - _executionMemoryUsed)
_executionMemoryUsed += bytesToGrant
Expand All @@ -72,7 +70,7 @@ private[spark] class StaticMemoryManager(
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
acquireStorageMemory(blockId, numBytes, numBytes, evictedBlocks)
}

Expand All @@ -88,7 +86,7 @@ private[spark] class StaticMemoryManager(
override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
val currentUnrollMemory = memoryStore.currentUnrollMemory
val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - currentUnrollMemory)
val numBytesToFree = math.min(numBytes, maxNumBytesToFree)
Expand All @@ -108,71 +106,16 @@ private[spark] class StaticMemoryManager(
blockId: BlockId,
numBytesToAcquire: Long,
numBytesToFree: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
// Note: Keep this outside synchronized block to avoid potential deadlocks!
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
synchronized {
assert(_storageMemoryUsed <= maxStorageMemory)
val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= maxStorageMemory
if (enoughMemory) {
_storageMemoryUsed += numBytesToAcquire
}
enoughMemory
}
}

/**
* Release N bytes of execution memory.
*/
override def releaseExecutionMemory(numBytes: Long): Unit = synchronized {
if (numBytes > _executionMemoryUsed) {
logWarning(s"Attempted to release $numBytes bytes of execution " +
s"memory when we only have ${_executionMemoryUsed} bytes")
_executionMemoryUsed = 0
} else {
_executionMemoryUsed -= numBytes
}
}

/**
* Release N bytes of storage memory.
*/
override def releaseStorageMemory(numBytes: Long): Unit = synchronized {
if (numBytes > _storageMemoryUsed) {
logWarning(s"Attempted to release $numBytes bytes of storage " +
s"memory when we only have ${_storageMemoryUsed} bytes")
_storageMemoryUsed = 0
} else {
_storageMemoryUsed -= numBytes
assert(_storageMemoryUsed <= maxStorageMemory)
val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= maxStorageMemory
if (enoughMemory) {
_storageMemoryUsed += numBytesToAcquire
}
}

/**
* Release all storage memory acquired.
*/
override def releaseStorageMemory(): Unit = synchronized {
_storageMemoryUsed = 0
}

/**
* Release N bytes of unroll memory.
*/
override def releaseUnrollMemory(numBytes: Long): Unit = {
releaseStorageMemory(numBytes)
}

/**
* Amount of execution memory currently in use, in bytes.
*/
override def executionMemoryUsed: Long = synchronized {
_executionMemoryUsed
}

/**
* Amount of storage memory currently in use, in bytes.
*/
override def storageMemoryUsed: Long = synchronized {
_storageMemoryUsed
enoughMemory
}

}
Expand All @@ -184,19 +127,21 @@ private[spark] object StaticMemoryManager {
* Return the total amount of memory available for the storage region, in bytes.
*/
private def getMaxStorageMemory(conf: SparkConf): Long = {
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
(systemMaxMemory * memoryFraction * safetyFraction).toLong
}


/**
* Return the total amount of memory available for the execution region, in bytes.
*/
private def getMaxExecutionMemory(conf: SparkConf): Long = {
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
(systemMaxMemory * memoryFraction * safetyFraction).toLong
}

}
Loading