From af68ece323401d06f7e036c3b0782bc80a0f4c93 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Fri, 23 Feb 2024 16:32:11 -0800 Subject: [PATCH] try use BlockManager --- .../org/apache/spark/executor/Executor.scala | 2 +- .../scala/org/apache/spark/scheduler/Task.scala | 16 +++++++++------- .../org/apache/spark/storage/BlockManager.scala | 3 ++- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 0a3067c5a71e2..7c30a8782df9b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -589,7 +589,7 @@ private[spark] class Executor( taskDescription.serializedTask, Thread.currentThread.getContextClassLoader) task.localProperties = taskDescription.properties task.setTaskMemoryManager(taskMemoryManager) - task.setEnv(env) + task.setBlockManager(env.blockManager) // If this task has been killed before we deserialized it, let's quit now. Otherwise, // continue executing the task. diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index ba880d0e69aa0..089daf31ef973 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -28,6 +28,7 @@ import org.apache.spark.memory.{MemoryMode, TaskMemoryManager} import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rdd.InputFileBlockHolder import org.apache.spark.resource.ResourceInformation +import org.apache.spark.storage.BlockManager import org.apache.spark.util._ /** @@ -126,7 +127,7 @@ private[spark] abstract class Task[T]( new CallerContext( "TASK", - env.conf.get(APP_CALLER_CONTEXT), + SparkEnv.get.conf.get(APP_CALLER_CONTEXT), appId, appAttemptId, jobId, @@ -143,14 +144,15 @@ private[spark] abstract class Task[T]( try { Utils.tryLogNonFatalError { // Release memory used by this thread for unrolling blocks - env.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP) - env.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP) + blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP) + blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP) // Notify any tasks waiting for execution memory to be freed to wake up and try to // acquire memory again. This makes impossible the scenario where a task sleeps forever // because there are no other tasks left to notify it. Since this is safe to do but may // not be strictly necessary, we should revisit whether we can remove this in the // future. - val memoryManager = env.memoryManager + + val memoryManager = blockManager.memoryManager memoryManager.synchronized { memoryManager.notifyAll() } } } finally { @@ -163,14 +165,14 @@ private[spark] abstract class Task[T]( } private var taskMemoryManager: TaskMemoryManager = _ - private var env: SparkEnv = _ + private var blockManager: BlockManager = _ def setTaskMemoryManager(taskMemoryManager: TaskMemoryManager): Unit = { this.taskMemoryManager = taskMemoryManager } - def setEnv(env: SparkEnv): Unit = { - this.env = env + def setBlockManager(blockManager: BlockManager): Unit = { + this.blockManager = blockManager } def runTask(context: TaskContext): T diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index b5d1c7ed69c8f..228ec5752e1b6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -201,7 +201,8 @@ private[spark] class BlockManager( // Similarly, we also initialize MemoryManager later after DriverPlugin is loaded, to // allow the plugin to overwrite certain memory configurations. The `_memoryManager` will be // null here and we ask for the instance from SparkEnv - private lazy val memoryManager = Option(_memoryManager).getOrElse(SparkEnv.get.memoryManager) + private[spark] lazy val memoryManager = + Option(_memoryManager).getOrElse(SparkEnv.get.memoryManager) // same as `conf.get(config.SHUFFLE_SERVICE_ENABLED)` private[spark] val externalShuffleServiceEnabled: Boolean = externalBlockStoreClient.isDefined