From b38a321b02f408e28b3ae1cb9f1eece4b6700b80 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Tue, 6 Feb 2024 12:51:54 -0800 Subject: [PATCH] initial commit --- .../scala/org/apache/spark/SparkContext.scala | 2 ++ .../main/scala/org/apache/spark/SparkEnv.scala | 18 +++++++++++++----- .../CoarseGrainedExecutorBackend.scala | 2 ++ .../apache/spark/storage/BlockManager.scala | 7 ++++++- 4 files changed, 23 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index da37fa83254bc..01def222a948d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -578,6 +578,8 @@ class SparkContext(config: SparkConf) extends Logging { // Initialize any plugins before the task scheduler is initialized. _plugins = PluginContainer(this, _resources.asJava) _env.initializeShuffleManager() + _env.initializeMemoryManager(SparkContext.numDriverCores(master, conf)) + // Create and start the scheduler val (sched, ts) = SparkContext.createTaskScheduler(this, master) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index ca07c276fbff3..6bdccae719053 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -67,7 +67,6 @@ class SparkEnv ( val blockManager: BlockManager, val securityManager: SecurityManager, val metricsSystem: MetricsSystem, - val memoryManager: MemoryManager, val outputCommitCoordinator: OutputCommitCoordinator, val conf: SparkConf) extends Logging { @@ -77,6 +76,12 @@ class SparkEnv ( def shuffleManager: ShuffleManager = _shuffleManager + // We initialize the MemoryManager later in SparkContext after DriverPlugin is loaded + // to allow the plugin to overwrite memory configurations + private var _memoryManager: MemoryManager = _ + + def memoryManager: MemoryManager = _memoryManager + @volatile private[spark] var isStopped = false /** @@ -199,6 +204,12 @@ class SparkEnv ( "Shuffle manager already initialized to %s", _shuffleManager) _shuffleManager = ShuffleManager.create(conf, executorId == SparkContext.DRIVER_IDENTIFIER) } + + private[spark] def initializeMemoryManager(numUsableCores: Int): Unit = { + Preconditions.checkState(null == memoryManager, + "Memory manager already initialized to %s", _memoryManager) + _memoryManager = UnifiedMemoryManager(conf, numUsableCores) + } } object SparkEnv extends Logging { @@ -358,8 +369,6 @@ object SparkEnv extends Logging { new MapOutputTrackerMasterEndpoint( rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf)) - val memoryManager: MemoryManager = UnifiedMemoryManager(conf, numUsableCores) - val blockManagerPort = if (isDriver) { conf.get(DRIVER_BLOCK_MANAGER_PORT) } else { @@ -418,7 +427,7 @@ object SparkEnv extends Logging { blockManagerMaster, serializerManager, conf, - memoryManager, + _memoryManager = null, mapOutputTracker, _shuffleManager = null, blockTransferService, @@ -463,7 +472,6 @@ object SparkEnv extends Logging { blockManager, securityManager, metricsSystem, - memoryManager, outputCommitCoordinator, conf) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index b507d27f14c4b..c26ab42136001 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -469,6 +469,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress, arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false) + // Set the memory manager since it needs to be initialized explicitly + env.initializeMemoryManager(arguments.cores) // Set the application attemptId in the BlockStoreClient if available. val appAttemptId = env.conf.get(APP_ATTEMPT_ID) appAttemptId.foreach(attemptId => diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 42bbd025177b2..acd628abd7008 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -185,7 +185,7 @@ private[spark] class BlockManager( val master: BlockManagerMaster, val serializerManager: SerializerManager, val conf: SparkConf, - memoryManager: MemoryManager, + private val _memoryManager: MemoryManager, mapOutputTracker: MapOutputTracker, private val _shuffleManager: ShuffleManager, val blockTransferService: BlockTransferService, @@ -198,6 +198,11 @@ private[spark] class BlockManager( // (except for tests) and we ask for the instance from the SparkEnv. private lazy val shuffleManager = Option(_shuffleManager).getOrElse(SparkEnv.get.shuffleManager) + // Similarly, we also initialize MemoryManager later after DriverPlugin is loaded, to + // allow the plugin to overwrite certain memory configurations. The `_memoryManager` will be + // null here and we ask for the instance from SparkEnv + private lazy val memoryManager = Option(_memoryManager).getOrElse(SparkEnv.get.memoryManager) + // same as `conf.get(config.SHUFFLE_SERVICE_ENABLED)` private[spark] val externalShuffleServiceEnabled: Boolean = externalBlockStoreClient.isDefined private val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER