-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-10983] Unified memory manager #9084
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6481bc1
e08e1fe
812c974
cc5f64c
ad8a6c4
0dc9a95
5a4ffb9
b519540
a65799e
6e913a5
3eef5a4
c56600b
01ff533
93c3cef
059ae0d
24a391c
13898b5
4b64846
4f70806
face129
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ package org.apache.spark.memory | |
|
||
import scala.collection.mutable | ||
|
||
import org.apache.spark.Logging | ||
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore} | ||
|
||
|
||
|
@@ -29,7 +30,7 @@ import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore} | |
* sorts and aggregations, while storage memory refers to that used for caching and propagating | ||
* internal data across the cluster. There exists one of these per JVM. | ||
*/ | ||
private[spark] abstract class MemoryManager { | ||
private[spark] abstract class MemoryManager extends Logging { | ||
|
||
// The memory store used to evict cached blocks | ||
private var _memoryStore: MemoryStore = _ | ||
|
@@ -40,19 +41,38 @@ private[spark] abstract class MemoryManager { | |
_memoryStore | ||
} | ||
|
||
// Amount of execution/storage memory in use, accesses must be synchronized on `this` | ||
protected var _executionMemoryUsed: Long = 0 | ||
protected var _storageMemoryUsed: Long = 0 | ||
|
||
/** | ||
* Set the [[MemoryStore]] used by this manager to evict cached blocks. | ||
* This must be set after construction due to initialization ordering constraints. | ||
*/ | ||
def setMemoryStore(store: MemoryStore): Unit = { | ||
final def setMemoryStore(store: MemoryStore): Unit = { | ||
_memoryStore = store | ||
} | ||
|
||
/** | ||
* Acquire N bytes of memory for execution. | ||
* Total available memory for execution, in bytes. | ||
*/ | ||
def maxExecutionMemory: Long | ||
|
||
/** | ||
* Total available memory for storage, in bytes. | ||
*/ | ||
def maxStorageMemory: Long | ||
|
||
// TODO: avoid passing evicted blocks around to simplify method signatures (SPARK-10985) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call on deferring this to a followup task. |
||
|
||
/** | ||
* Acquire N bytes of memory for execution, evicting cached blocks if necessary. | ||
* Blocks evicted in the process, if any, are added to `evictedBlocks`. | ||
* @return number of bytes successfully granted (<= N). | ||
*/ | ||
def acquireExecutionMemory(numBytes: Long): Long | ||
def acquireExecutionMemory( | ||
numBytes: Long, | ||
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long | ||
|
||
/** | ||
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary. | ||
|
@@ -66,52 +86,73 @@ private[spark] abstract class MemoryManager { | |
|
||
/** | ||
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary. | ||
* | ||
* This extra method allows subclasses to differentiate behavior between acquiring storage | ||
* memory and acquiring unroll memory. For instance, the memory management model in Spark | ||
* 1.5 and before places a limit on the amount of space that can be freed from unrolling. | ||
* Blocks evicted in the process, if any, are added to `evictedBlocks`. | ||
* | ||
* @return whether all N bytes were successfully granted. | ||
*/ | ||
def acquireUnrollMemory( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually, it's more than a synonym. In |
||
blockId: BlockId, | ||
numBytes: Long, | ||
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean | ||
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { | ||
acquireStorageMemory(blockId, numBytes, evictedBlocks) | ||
} | ||
|
||
/** | ||
* Release N bytes of execution memory. | ||
*/ | ||
def releaseExecutionMemory(numBytes: Long): Unit | ||
def releaseExecutionMemory(numBytes: Long): Unit = synchronized { | ||
if (numBytes > _executionMemoryUsed) { | ||
logWarning(s"Attempted to release $numBytes bytes of execution " + | ||
s"memory when we only have ${_executionMemoryUsed} bytes") | ||
_executionMemoryUsed = 0 | ||
} else { | ||
_executionMemoryUsed -= numBytes | ||
} | ||
} | ||
|
||
/** | ||
* Release N bytes of storage memory. | ||
*/ | ||
def releaseStorageMemory(numBytes: Long): Unit | ||
def releaseStorageMemory(numBytes: Long): Unit = synchronized { | ||
if (numBytes > _storageMemoryUsed) { | ||
logWarning(s"Attempted to release $numBytes bytes of storage " + | ||
s"memory when we only have ${_storageMemoryUsed} bytes") | ||
_storageMemoryUsed = 0 | ||
} else { | ||
_storageMemoryUsed -= numBytes | ||
} | ||
} | ||
|
||
/** | ||
* Release all storage memory acquired. | ||
*/ | ||
def releaseStorageMemory(): Unit | ||
def releaseAllStorageMemory(): Unit = synchronized { | ||
_storageMemoryUsed = 0 | ||
} | ||
|
||
/** | ||
* Release N bytes of unroll memory. | ||
*/ | ||
def releaseUnrollMemory(numBytes: Long): Unit | ||
|
||
/** | ||
* Total available memory for execution, in bytes. | ||
*/ | ||
def maxExecutionMemory: Long | ||
|
||
/** | ||
* Total available memory for storage, in bytes. | ||
*/ | ||
def maxStorageMemory: Long | ||
def releaseUnrollMemory(numBytes: Long): Unit = synchronized { | ||
releaseStorageMemory(numBytes) | ||
} | ||
|
||
/** | ||
* Execution memory currently in use, in bytes. | ||
*/ | ||
def executionMemoryUsed: Long | ||
final def executionMemoryUsed: Long = synchronized { | ||
_executionMemoryUsed | ||
} | ||
|
||
/** | ||
* Storage memory currently in use, in bytes. | ||
*/ | ||
def storageMemoryUsed: Long | ||
final def storageMemoryUsed: Long = synchronized { | ||
_storageMemoryUsed | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why here is protected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pzz2011 you're making several comments on old PRs. Generally people won't see that and it's not the place for discussion anyway. If you can formulate a specific question beyond "why is the code this way?" ask on user@.