Skip to content

Commit e706ba1

Browse files
Bo XiongMridul Muralidharan
authored andcommitted
[SPARK-45227][CORE] Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend
### What changes were proposed in this pull request? Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck ### Why are the changes needed? For each executor, the single-threaded dispatcher can run into an "infinite loop" (as explained in the SPARK-45227). Once an executor process runs into a state, it'd stop launching tasks from the driver or reporting task status back. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ``` $ build/mvn package -DskipTests -pl core $ build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite test ``` ### Was this patch authored or co-authored using generative AI tooling? No ****************************************************************************** **_Please feel free to skip reading unless you're interested in details_** ****************************************************************************** ### Symptom Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the very last step of writing a data frame to S3 by calling `df.write`. Looking at Spark UI, we saw that an executor process hung over 1 hour. After we manually killed the executor process, the app succeeded. Note that the same EMR cluster with two worker nodes was able to run the same app without any issue before and after the incident. Below is what's observed from relevant container logs and thread dump. - A regular task that's sent to the executor, which also reported back to the driver upon the task completion. ``` $zgrep 'task 150' container_1694029806204_12865_01_000001/stderr.gz 23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 23.0 (TID 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map() 23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 23.0 (TID 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200) $zgrep ' 923' container_1694029806204_12865_01_000004/stderr.gz 23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 923 $zgrep 'task 150' container_1694029806204_12865_01_000004/stderr.gz 23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 923) 23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 923). 4495 bytes result sent to driver ``` - Another task that's sent to the executor but didn't get launched since the single-threaded dispatcher was stuck (presumably in an "infinite loop" as explained later). ``` $zgrep 'task 153' container_1694029806204_12865_01_000001/stderr.gz 23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 23.0 (TID 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map() $zgrep ' 924' container_1694029806204_12865_01_000004/stderr.gz 23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 924 $zgrep 'task 153' container_1694029806204_12865_01_000004/stderr.gz >> note that the above command has no matching result, indicating that task 153.0 in stage 23.0 (TID 924) was never launched ``` - Thread dump shows that the dispatcher-Executor thread has the following stack trace. ``` "dispatcher-Executor" #40 daemon prio=5 os_prio=0 tid=0x0000ffff98e37800 nid=0x1aff runnable [0x0000ffff73bba000] java.lang.Thread.State: RUNNABLE at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142) at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131) at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123) at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365) at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365) at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44) at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140) at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169) at scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167) at scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44) at scala.collection.mutable.HashMap.put(HashMap.scala:126) at scala.collection.mutable.HashMap.update(HashMap.scala:131) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:200) at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) at org.apache.spark.rpc.netty.Inbox$$Lambda$323/1930826709.apply$mcV$sp(Unknown Source) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) ``` ### Relevant code paths Within an executor process, there's a [dispatcher thread](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170) dedicated to CoarseGrainedExecutorBackend(a single RPC endpoint) that launches tasks scheduled by the driver. Each task is run on a TaskRunner thread backed by a thread pool created for the executor. The TaskRunner thread and the dispatcher thread are different. However, they read and write a common object (i.e., taskResources) that's a mutable hashmap without thread-safety, in [Executor](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561) and [CoarseGrainedExecutorBackend](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189), respectively. ### What's going on? Based on the above observations, our hypothesis is that the dispatcher thread runs into an "infinite loop" due to a race condition when two threads access the same hashmap object. For illustration purpose, let's consider the following scenario where two threads (Thread 1 and Thread 2) access a hash table without thread-safety - Thread 1 sees A.next = B, but then yields execution to Thread 2 <img src="https://issues.apache.org/jira/secure/attachment/13063040/13063040_hashtable1.png" width="400"> - Thread 2 triggers a resize operation resulting in B.next = A (Note that hashmap doesn't care about ordering), and then yields execution to Thread 1. <img src="https://issues.apache.org/jira/secure/attachment/13063041/13063041_hashtable2.png" width="400"> - After taking over CPU, Thread 1 would run into an "infinite loop" when traversing the list in the last bucket, given A.next = B and B.next = A in its view. Closes apache#43021 from xiongbo-sjtu/master. Authored-by: Bo Xiong <xiongbo@amazon.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 8e6b160) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
1 parent 68db395 commit e706ba1

File tree

2 files changed

+10
-7
lines changed

2 files changed

+10
-7
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ package org.apache.spark.executor
2020
import java.net.URL
2121
import java.nio.ByteBuffer
2222
import java.util.Locale
23+
import java.util.concurrent.ConcurrentHashMap
2324
import java.util.concurrent.atomic.AtomicBoolean
2425

25-
import scala.collection.mutable
2626
import scala.util.{Failure, Success}
2727
import scala.util.control.NonFatal
2828

@@ -71,9 +71,12 @@ private[spark] class CoarseGrainedExecutorBackend(
7171
/**
7272
* Map each taskId to the information about the resource allocated to it, Please refer to
7373
* [[ResourceInformation]] for specifics.
74+
* CHM is used to ensure thread-safety (https://issues.apache.org/jira/browse/SPARK-45227)
7475
* Exposed for testing only.
7576
*/
76-
private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]]
77+
private[executor] val taskResources = new ConcurrentHashMap[
78+
Long, Map[String, ResourceInformation]
79+
]
7780

7881
private var decommissioned = false
7982

@@ -184,7 +187,7 @@ private[spark] class CoarseGrainedExecutorBackend(
184187
} else {
185188
val taskDesc = TaskDescription.decode(data.value)
186189
logInfo("Got assigned task " + taskDesc.taskId)
187-
taskResources(taskDesc.taskId) = taskDesc.resources
190+
taskResources.put(taskDesc.taskId, taskDesc.resources)
188191
executor.launchTask(this, taskDesc)
189192
}
190193

@@ -261,7 +264,7 @@ private[spark] class CoarseGrainedExecutorBackend(
261264
}
262265

263266
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit = {
264-
val resources = taskResources.getOrElse(taskId, Map.empty[String, ResourceInformation])
267+
val resources = taskResources.getOrDefault(taskId, Map.empty[String, ResourceInformation])
265268
val cpus = executor.runningTasks.get(taskId).taskDescription.cpus
266269
val msg = StatusUpdate(executorId, taskId, state, data, cpus, resources)
267270
if (TaskState.isFinished(state)) {

core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
303303
resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf))
304304
assert(backend.taskResources.isEmpty)
305305

306-
val taskId = 1000000
306+
val taskId = 1000000L
307307
// We don't really verify the data, just pass it around.
308308
val data = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))
309309
val taskDescription = new TaskDescription(taskId, 2, "1", "TASK 1000000", 19,
@@ -340,14 +340,14 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
340340
backend.self.send(LaunchTask(new SerializableBuffer(serializedTaskDescription)))
341341
eventually(timeout(10.seconds)) {
342342
assert(backend.taskResources.size == 1)
343-
val resources = backend.taskResources(taskId)
343+
val resources = backend.taskResources.get(taskId)
344344
assert(resources(GPU).addresses sameElements Array("0", "1"))
345345
}
346346

347347
// Update the status of a running task shall not affect `taskResources` map.
348348
backend.statusUpdate(taskId, TaskState.RUNNING, data)
349349
assert(backend.taskResources.size == 1)
350-
val resources = backend.taskResources(taskId)
350+
val resources = backend.taskResources.get(taskId)
351351
assert(resources(GPU).addresses sameElements Array("0", "1"))
352352

353353
// Update the status of a finished task shall remove the entry from `taskResources` map.

0 commit comments

Comments
 (0)