Skip to content

Commit a837105

Browse files
Bo XiongMridul Muralidharan
Bo Xiong
authored and
Mridul Muralidharan
committed
[SPARK-45227][CORE] Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend
Backport of #43021 to branch 3.3 ### What changes were proposed in this pull request? Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck. ### Why are the changes needed? For each executor, the single-threaded dispatcher can run into an "infinite loop" (as explained in the SPARK-45227). Once an executor process runs into a state, it'd stop launching tasks from the driver or reporting task status back. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ``` $ build/mvn package -DskipTests -pl core $ build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite test ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #43176 from xiongbo-sjtu/branch-3.3. Authored-by: Bo Xiong <xiongbo@amazon.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
1 parent 62c2f83 commit a837105

File tree

2 files changed

+10
-7
lines changed

2 files changed

+10
-7
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ package org.apache.spark.executor
2020
import java.net.URL
2121
import java.nio.ByteBuffer
2222
import java.util.Locale
23+
import java.util.concurrent.ConcurrentHashMap
2324
import java.util.concurrent.atomic.AtomicBoolean
2425

25-
import scala.collection.mutable
2626
import scala.util.{Failure, Success}
2727
import scala.util.control.NonFatal
2828

@@ -71,9 +71,12 @@ private[spark] class CoarseGrainedExecutorBackend(
7171
/**
7272
* Map each taskId to the information about the resource allocated to it, Please refer to
7373
* [[ResourceInformation]] for specifics.
74+
* CHM is used to ensure thread-safety (https://issues.apache.org/jira/browse/SPARK-45227)
7475
* Exposed for testing only.
7576
*/
76-
private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]]
77+
private[executor] val taskResources = new ConcurrentHashMap[
78+
Long, Map[String, ResourceInformation]
79+
]
7780

7881
private var decommissioned = false
7982

@@ -184,7 +187,7 @@ private[spark] class CoarseGrainedExecutorBackend(
184187
} else {
185188
val taskDesc = TaskDescription.decode(data.value)
186189
logInfo("Got assigned task " + taskDesc.taskId)
187-
taskResources(taskDesc.taskId) = taskDesc.resources
190+
taskResources.put(taskDesc.taskId, taskDesc.resources)
188191
executor.launchTask(this, taskDesc)
189192
}
190193

@@ -261,7 +264,7 @@ private[spark] class CoarseGrainedExecutorBackend(
261264
}
262265

263266
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit = {
264-
val resources = taskResources.getOrElse(taskId, Map.empty[String, ResourceInformation])
267+
val resources = taskResources.getOrDefault(taskId, Map.empty[String, ResourceInformation])
265268
val msg = StatusUpdate(executorId, taskId, state, data, resources)
266269
if (TaskState.isFinished(state)) {
267270
taskResources.remove(taskId)

core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
300300
resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf))
301301
assert(backend.taskResources.isEmpty)
302302

303-
val taskId = 1000000
303+
val taskId = 1000000L
304304
// We don't really verify the data, just pass it around.
305305
val data = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))
306306
val taskDescription = new TaskDescription(taskId, 2, "1", "TASK 1000000", 19,
@@ -314,14 +314,14 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
314314
backend.self.send(LaunchTask(new SerializableBuffer(serializedTaskDescription)))
315315
eventually(timeout(10.seconds)) {
316316
assert(backend.taskResources.size == 1)
317-
val resources = backend.taskResources(taskId)
317+
val resources = backend.taskResources.get(taskId)
318318
assert(resources(GPU).addresses sameElements Array("0", "1"))
319319
}
320320

321321
// Update the status of a running task shall not affect `taskResources` map.
322322
backend.statusUpdate(taskId, TaskState.RUNNING, data)
323323
assert(backend.taskResources.size == 1)
324-
val resources = backend.taskResources(taskId)
324+
val resources = backend.taskResources.get(taskId)
325325
assert(resources(GPU).addresses sameElements Array("0", "1"))
326326

327327
// Update the status of a finished task shall remove the entry from `taskResources` map.

0 commit comments

Comments
 (0)