Skip to content

Commit

Permalink
try use BlockManager
Browse files Browse the repository at this point in the history
  • Loading branch information
sunchao committed Feb 24, 2024
1 parent a980e07 commit af68ece
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ private[spark] class Executor(
taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
task.localProperties = taskDescription.properties
task.setTaskMemoryManager(taskMemoryManager)
task.setEnv(env)
task.setBlockManager(env.blockManager)

// If this task has been killed before we deserialized it, let's quit now. Otherwise,
// continue executing the task.
Expand Down
16 changes: 9 additions & 7 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rdd.InputFileBlockHolder
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.storage.BlockManager
import org.apache.spark.util._

/**
Expand Down Expand Up @@ -126,7 +127,7 @@ private[spark] abstract class Task[T](

new CallerContext(
"TASK",
env.conf.get(APP_CALLER_CONTEXT),
SparkEnv.get.conf.get(APP_CALLER_CONTEXT),
appId,
appAttemptId,
jobId,
Expand All @@ -143,14 +144,15 @@ private[spark] abstract class Task[T](
try {
Utils.tryLogNonFatalError {
// Release memory used by this thread for unrolling blocks
env.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
env.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP)
blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP)
// Notify any tasks waiting for execution memory to be freed to wake up and try to
// acquire memory again. This makes impossible the scenario where a task sleeps forever
// because there are no other tasks left to notify it. Since this is safe to do but may
// not be strictly necessary, we should revisit whether we can remove this in the
// future.
val memoryManager = env.memoryManager

val memoryManager = blockManager.memoryManager
memoryManager.synchronized { memoryManager.notifyAll() }
}
} finally {
Expand All @@ -163,14 +165,14 @@ private[spark] abstract class Task[T](
}

private var taskMemoryManager: TaskMemoryManager = _
private var env: SparkEnv = _
private var blockManager: BlockManager = _

def setTaskMemoryManager(taskMemoryManager: TaskMemoryManager): Unit = {
this.taskMemoryManager = taskMemoryManager
}

def setEnv(env: SparkEnv): Unit = {
this.env = env
def setBlockManager(blockManager: BlockManager): Unit = {
this.blockManager = blockManager
}

def runTask(context: TaskContext): T
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ private[spark] class BlockManager(
// Similarly, we also initialize MemoryManager later after DriverPlugin is loaded, to
// allow the plugin to overwrite certain memory configurations. The `_memoryManager` will be
// null here and we ask for the instance from SparkEnv
private lazy val memoryManager = Option(_memoryManager).getOrElse(SparkEnv.get.memoryManager)
private[spark] lazy val memoryManager =
Option(_memoryManager).getOrElse(SparkEnv.get.memoryManager)

// same as `conf.get(config.SHUFFLE_SERVICE_ENABLED)`
private[spark] val externalShuffleServiceEnabled: Boolean = externalBlockStoreClient.isDefined
Expand Down

0 comments on commit af68ece

Please sign in to comment.