Skip to content

Commit

Permalink
set env in Task
Browse files Browse the repository at this point in the history
  • Loading branch information
sunchao committed Feb 23, 2024
1 parent a6502e0 commit ac21273
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ private[spark] class Executor(
taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
task.localProperties = taskDescription.properties
task.setTaskMemoryManager(taskMemoryManager)
task.setEnv(env)

// If this task has been killed before we deserialized it, let's quit now. Otherwise,
// continue executing the task.
Expand Down
14 changes: 9 additions & 5 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private[spark] abstract class Task[T](

new CallerContext(
"TASK",
SparkEnv.get.conf.get(APP_CALLER_CONTEXT),
env.conf.get(APP_CALLER_CONTEXT),
appId,
appAttemptId,
jobId,
Expand All @@ -143,15 +143,14 @@ private[spark] abstract class Task[T](
try {
Utils.tryLogNonFatalError {
// Release memory used by this thread for unrolling blocks
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(
MemoryMode.OFF_HEAP)
env.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
env.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP)
// Notify any tasks waiting for execution memory to be freed to wake up and try to
// acquire memory again. This makes impossible the scenario where a task sleeps forever
// because there are no other tasks left to notify it. Since this is safe to do but may
// not be strictly necessary, we should revisit whether we can remove this in the
// future.
val memoryManager = SparkEnv.get.memoryManager
val memoryManager = env.memoryManager
memoryManager.synchronized { memoryManager.notifyAll() }
}
} finally {
Expand All @@ -164,11 +163,16 @@ private[spark] abstract class Task[T](
}

private var taskMemoryManager: TaskMemoryManager = _
private var env: SparkEnv = _

def setTaskMemoryManager(taskMemoryManager: TaskMemoryManager): Unit = {
this.taskMemoryManager = taskMemoryManager
}

def setEnv(env: SparkEnv): Unit = {
this.env = env
}

def runTask(context: TaskContext): T

def preferredLocations: Seq[TaskLocation] = Nil
Expand Down

0 comments on commit ac21273

Please sign in to comment.