Skip to content

Commit d696d38

Browse files
author
Bo Xiong
committed
[SPARK-45227][CORE] Fix an issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck
1 parent 06ccb6d commit d696d38

File tree

2 files changed

+10
-7
lines changed

2 files changed

+10
-7
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ package org.apache.spark.executor
2020
import java.net.URL
2121
import java.nio.ByteBuffer
2222
import java.util.Locale
23+
import java.util.concurrent.ConcurrentHashMap
2324
import java.util.concurrent.atomic.AtomicBoolean
2425

25-
import scala.collection.mutable
2626
import scala.util.{Failure, Success}
2727
import scala.util.control.NonFatal
2828

@@ -71,9 +71,12 @@ private[spark] class CoarseGrainedExecutorBackend(
7171
/**
7272
* Map each taskId to the information about the resource allocated to it, Please refer to
7373
* [[ResourceInformation]] for specifics.
74+
* CHM is used to ensure thread-safety (https://issues.apache.org/jira/browse/SPARK-45227)
7475
* Exposed for testing only.
7576
*/
76-
private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]]
77+
private[executor] val taskResources = new ConcurrentHashMap[
78+
Long, Map[String, ResourceInformation]
79+
]
7780

7881
private var decommissioned = false
7982

@@ -186,7 +189,7 @@ private[spark] class CoarseGrainedExecutorBackend(
186189
} else {
187190
val taskDesc = TaskDescription.decode(data.value)
188191
logInfo("Got assigned task " + taskDesc.taskId)
189-
taskResources(taskDesc.taskId) = taskDesc.resources
192+
taskResources.put(taskDesc.taskId, taskDesc.resources)
190193
executor.launchTask(this, taskDesc)
191194
}
192195

@@ -266,7 +269,7 @@ private[spark] class CoarseGrainedExecutorBackend(
266269
}
267270

268271
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit = {
269-
val resources = taskResources.getOrElse(taskId, Map.empty[String, ResourceInformation])
272+
val resources = taskResources.getOrDefault(taskId, Map.empty[String, ResourceInformation])
270273
val cpus = executor.runningTasks.get(taskId).taskDescription.cpus
271274
val msg = StatusUpdate(executorId, taskId, state, data, cpus, resources)
272275
if (TaskState.isFinished(state)) {

core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
302302
resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf))
303303
assert(backend.taskResources.isEmpty)
304304

305-
val taskId = 1000000
305+
val taskId = 1000000L
306306
// We don't really verify the data, just pass it around.
307307
val data = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))
308308
val taskDescription = new TaskDescription(taskId, 2, "1", "TASK 1000000", 19,
@@ -339,14 +339,14 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
339339
backend.self.send(LaunchTask(new SerializableBuffer(serializedTaskDescription)))
340340
eventually(timeout(10.seconds)) {
341341
assert(backend.taskResources.size == 1)
342-
val resources = backend.taskResources(taskId)
342+
val resources = backend.taskResources.get(taskId)
343343
assert(resources(GPU).addresses sameElements Array("0", "1"))
344344
}
345345

346346
// Update the status of a running task shall not affect `taskResources` map.
347347
backend.statusUpdate(taskId, TaskState.RUNNING, data)
348348
assert(backend.taskResources.size == 1)
349-
val resources = backend.taskResources(taskId)
349+
val resources = backend.taskResources.get(taskId)
350350
assert(resources(GPU).addresses sameElements Array("0", "1"))
351351

352352
// Update the status of a finished task shall remove the entry from `taskResources` map.

0 commit comments

Comments
 (0)