Skip to content

Commit 76cdbac

Browse files
committed
[SPARK-52185][CORE] Automate the thread dump collection for Spark applications
### What changes were proposed in this pull request? When a Java program runs for a long time without giving you any feedback/output, how do you determine what the program might be doing and whether it’s stuck? Thread dumps can help in such case. It shows the status of the threads (if it's running/waiting/blocked) and which part of the code is being executed by each thread, being important to detect deadlocks and which part of the program is running. **The purpose of this pull request is to collect thread dumps at regular intervals. Why? Getting a single thread dump only shows a snapshot of threads, getting several allows us to see if threads are progressing by comparing states.** Collecting thread dump samples from slow Spark executors or drivers can be challenging, especially in YARN or Kubernetes environments. Actual solutions which are available for debugging: 1) We need to find out where the Java Virtual Machine (JVM) is running then run the jstack command manually. 2) Download the thread dumps from the Spark UI. For example: http://localhost:4040/executors/threadDump/?executorId=driver 3) Download the thread dumps via Spark API. For example: curl "http://localhost:4040/api/v1/applications/local-1747400853731/executors/driver/threads" ### Why are the changes needed? The purpose of this feature request is to automate the thread dump collection at regular intervals. New Spark parameters have been introduced: - spark.driver.threadDumpCollector.enabled - spark.executor.threadDumpCollector.enabled - spark.threadDumpCollector.interval - spark.threadDumpCollector.dir - spark.threadDumpCollector.output.type - spark.threadDumpCollector.include.regex Example commands 1) spark-shell --master local-cluster[2,1,1050] --conf spark.driver.threadDumpCollector.enabled=true --conf spark.executor.threadDumpCollector.enabled=true --conf spark.threadDumpCollector.interval=15s --conf spark.threadDumpCollector.output.type=FILE --conf spark.threadDumpCollector.dir=hdfs:///user/example/jstack_test The thread dumps will be saved into hdfs:///user/example/jstack_test, example file names: app-20250516161130-0000-driver-2025-05-16_16_12_50.txt, app-20250516161130-0000-0-2025-05-16_16_12_51.txt 2) spark-shell --master local-cluster[2,1,1050] --conf spark.driver.threadDumpCollector.enabled=true --conf spark.executor.threadDumpCollector.enabled=true --conf spark.threadDumpCollector.interval=15s --conf spark.threadDumpCollector.output.type=LOG The thread dumps will be added to the log messages 3) spark-shell --master local-cluster[2,1,1050] --conf spark.driver.threadDumpCollector.enabled=true --conf spark.executor.threadDumpCollector.enabled=true --conf spark.threadDumpCollector.interval=15s --conf spark.threadDumpCollector.output.type=LOG --conf spark.threadDumpCollector.include.regex=something Only those thread dumps will be captured which match the given regular expression (spark.threadDumpCollector.include.regex) ### Does this PR introduce _any_ user-facing change? Yes, see above ### How was this patch tested? New unit tests have been created and it has been manually tested as well. ### Was this patch authored or co-authored using generative AI tooling? No
1 parent 5b07e52 commit 76cdbac

File tree

6 files changed

+284
-0
lines changed

6 files changed

+284
-0
lines changed

common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -835,6 +835,7 @@ private[spark] object LogKeys {
835835
case object TEMP_PATH extends LogKey
836836
case object TEST_SIZE extends LogKey
837837
case object THREAD extends LogKey
838+
case object THREAD_DUMPS extends LogKey
838839
case object THREAD_ID extends LogKey
839840
case object THREAD_NAME extends LogKey
840841
case object THREAD_POOL_KEEPALIVE_TIME extends LogKey

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ class SparkContext(config: SparkConf) extends Logging {
240240
private var _shutdownHookRef: AnyRef = _
241241
private var _statusStore: AppStatusStore = _
242242
private var _heartbeater: Heartbeater = _
243+
private var _driverThreadDumpCollector: ThreadDumpCollector = _
243244
private var _resources: immutable.Map[String, ResourceInformation] = _
244245
private var _shuffleDriverComponents: ShuffleDriverComponents = _
245246
private var _plugins: Option[PluginContainer] = None
@@ -629,6 +630,15 @@ class SparkContext(config: SparkConf) extends Logging {
629630
_env.blockManager.blockStoreClient.setAppAttemptId(attemptId)
630631
}
631632

633+
// Create and start the thread dump collector for the Spark driver
634+
if (_conf.get(DRIVER_THREAD_DUMP_COLLECTOR_ENABLED)) {
635+
_driverThreadDumpCollector = new ThreadDumpCollector(
636+
() => ThreadDumpCollector.saveThreadDumps(env),
637+
"driver-threadDumpCollector",
638+
conf.get(THREAD_DUMP_COLLECTOR_INTERVAL))
639+
_driverThreadDumpCollector.start()
640+
}
641+
632642
// initialize after application id and attempt id has been initialized
633643
_shuffleDriverComponents = ShuffleDataIOUtils.loadShuffleDataIO(_conf).driver()
634644
_shuffleDriverComponents.initializeApplication().asScala.foreach { case (k, v) =>
@@ -2383,6 +2393,12 @@ class SparkContext(config: SparkConf) extends Logging {
23832393
}
23842394
_heartbeater = null
23852395
}
2396+
if (_conf.get(DRIVER_THREAD_DUMP_COLLECTOR_ENABLED) && _driverThreadDumpCollector != null) {
2397+
Utils.tryLogNonFatalError {
2398+
_driverThreadDumpCollector.stop()
2399+
}
2400+
_driverThreadDumpCollector = null
2401+
}
23862402
if (env != null && _heartbeatReceiver != null) {
23872403
Utils.tryLogNonFatalError {
23882404
env.rpcEnv.stop(_heartbeatReceiver)
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import java.nio.charset.StandardCharsets
21+
import java.time.LocalDateTime
22+
import java.time.format.DateTimeFormatter
23+
import java.util.concurrent.TimeUnit
24+
25+
import org.apache.hadoop.fs.{FileSystem, Path}
26+
import org.apache.hadoop.fs.permission.FsPermission
27+
28+
import org.apache.spark.deploy.SparkHadoopUtil
29+
import org.apache.spark.internal.{Logging, LogKeys, MDC}
30+
import org.apache.spark.internal.config.{THREAD_DUMP_COLLECTOR_DIR,
31+
THREAD_DUMP_COLLECTOR_OUTPUT_TYPE, THREAD_DUMP_COLLECTOR_PATTERN}
32+
import org.apache.spark.util.{ThreadUtils, Utils}
33+
34+
/**
35+
* Creates a thread dump collector thread which will call the specified collectThreadDumps
36+
* function at intervals of intervalMs.
37+
*
38+
* @param collectThreadDumps the thread dump collector function to call.
39+
* @param name the thread name for the thread dump collector.
40+
* @param intervalMs the interval between stack trace collections.
41+
*/
42+
private[spark] class ThreadDumpCollector(
43+
collectThreadDumps: () => Unit,
44+
name: String,
45+
intervalMs: Long) extends Logging {
46+
// Executor for the thread dump collector task
47+
private val threadDumpCollector = ThreadUtils.newDaemonSingleThreadScheduledExecutor(name)
48+
49+
/** Schedules a task to collect the thread dumps */
50+
def start(): Unit = {
51+
val threadDumpCollectorTask = new Runnable() {
52+
override def run(): Unit = Utils.logUncaughtExceptions(collectThreadDumps())
53+
}
54+
threadDumpCollector.scheduleAtFixedRate(threadDumpCollectorTask, intervalMs, intervalMs,
55+
TimeUnit.MILLISECONDS)
56+
}
57+
58+
def stop(): Unit = {
59+
threadDumpCollector.shutdown()
60+
threadDumpCollector.awaitTermination(10, TimeUnit.SECONDS)
61+
}
62+
63+
}
64+
65+
private[spark] object ThreadDumpCollector extends Logging {
66+
def saveThreadDumps(env: SparkEnv): Unit = {
67+
env.conf.get(THREAD_DUMP_COLLECTOR_OUTPUT_TYPE) match {
68+
case "LOG" => writeThreadDumpsToLog(env)
69+
case "FILE" => writeThreadDumpsToFile(env)
70+
}
71+
}
72+
73+
private def validateRegex(env: SparkEnv, collectedThreadDump: String): Boolean = {
74+
val regexPattern = env.conf.get(THREAD_DUMP_COLLECTOR_PATTERN).r
75+
regexPattern.findFirstIn(collectedThreadDump).isDefined
76+
}
77+
78+
private def writeThreadDumpsToLog(env: SparkEnv): Unit = {
79+
val collectedThreadDump = Utils.getThreadDump().map(_.toString).mkString
80+
if (validateRegex(env, collectedThreadDump)) {
81+
logWarning(log"Thread dumps from ${MDC(LogKeys.EXECUTOR_ID, env.executorId)}:\n" +
82+
log"${MDC(LogKeys.THREAD_DUMPS, collectedThreadDump)}")
83+
}
84+
}
85+
86+
private def writeThreadDumpsToFile(env: SparkEnv): Unit = {
87+
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH_mm_ss")
88+
val timestamp = LocalDateTime.now.format(formatter)
89+
val threadDumpFileName = env.conf.getAppId + "-" + env.executorId + "-" + timestamp + ".txt"
90+
val collectedThreadDump = Utils.getThreadDump().map(_.toString).mkString
91+
if (validateRegex(env, collectedThreadDump)) {
92+
val hadoopConf = SparkHadoopUtil.get.newConfiguration(env.conf)
93+
val rootDir = env.conf.get(THREAD_DUMP_COLLECTOR_DIR)
94+
val fileSystem: FileSystem = new Path(rootDir).getFileSystem(hadoopConf)
95+
val threadDumpFilePermissions = new FsPermission(Integer.parseInt("700", 8).toShort)
96+
val dfsLogFile: Path = fileSystem.makeQualified(new Path(rootDir, threadDumpFileName))
97+
try {
98+
val outputStream = SparkHadoopUtil.createFile(fileSystem, dfsLogFile, allowEC = true)
99+
fileSystem.setPermission(dfsLogFile, threadDumpFilePermissions)
100+
outputStream.write(collectedThreadDump.getBytes(StandardCharsets
101+
.UTF_8))
102+
outputStream.close()
103+
} catch {
104+
case e: Exception =>
105+
logError(
106+
log"Could not save thread dumps into file from executor ${
107+
MDC(LogKeys.EXECUTOR_ID,
108+
env.executorId)
109+
}", e)
110+
}
111+
}
112+
}
113+
}

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ private[spark] class Executor(
8989
log"${LogMDC(OS_ARCH, System.getProperty("os.arch"))}")
9090
logInfo(log"Java version ${LogMDC(JAVA_VERSION, System.getProperty("java.version"))}")
9191

92+
private var executorThreadDumpCollector: ThreadDumpCollector = _
93+
9294
private val executorShutdown = new AtomicBoolean(false)
9395
val stopHookReference = ShutdownHookManager.addShutdownHook(
9496
() => stop()
@@ -326,6 +328,15 @@ private[spark] class Executor(
326328

327329
heartbeater.start()
328330

331+
// Create and start the thread dump collector for the Spark executor
332+
if (conf.get(EXECUTOR_THREAD_DUMP_COLLECTOR_ENABLED)) {
333+
executorThreadDumpCollector = new ThreadDumpCollector(
334+
() => ThreadDumpCollector.saveThreadDumps(env),
335+
"executor-ThreadDumpCollector",
336+
conf.get(THREAD_DUMP_COLLECTOR_INTERVAL))
337+
executorThreadDumpCollector.start()
338+
}
339+
329340
private val appStartTime = conf.getLong("spark.app.startTime", 0)
330341

331342
// To allow users to distribute plugins and their required files
@@ -446,6 +457,15 @@ private[spark] class Executor(
446457
case NonFatal(e) =>
447458
logWarning("Unable to stop heartbeater", e)
448459
}
460+
try {
461+
if (conf.get(EXECUTOR_THREAD_DUMP_COLLECTOR_ENABLED) && executorThreadDumpCollector !=
462+
null) {
463+
executorThreadDumpCollector.stop()
464+
}
465+
} catch {
466+
case NonFatal(e) =>
467+
logWarning("Unable to stop the executor thread dump collector", e)
468+
}
449469
ShuffleBlockPusher.stop()
450470
if (threadPool != null) {
451471
threadPool.shutdown()

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2838,4 +2838,50 @@ package object config {
28382838
.checkValues(Set("connect", "classic"))
28392839
.createWithDefault(
28402840
if (sys.env.get("SPARK_CONNECT_MODE").contains("1")) "connect" else "classic")
2841+
2842+
private[spark] val DRIVER_THREAD_DUMP_COLLECTOR_ENABLED = ConfigBuilder("spark.driver" +
2843+
".threadDumpCollector.enabled")
2844+
.doc("Whether to enable automatic thread dump collection for driver")
2845+
.version("4.1.0")
2846+
.booleanConf
2847+
.createWithDefault(false)
2848+
2849+
private[spark] val EXECUTOR_THREAD_DUMP_COLLECTOR_ENABLED = ConfigBuilder("spark.executor" +
2850+
".threadDumpCollector.enabled")
2851+
.doc("Whether to enable automatic thread dump collection for each executor")
2852+
.version("4.1.0")
2853+
.booleanConf
2854+
.createWithDefault(false)
2855+
2856+
private[spark] val THREAD_DUMP_COLLECTOR_INTERVAL =
2857+
ConfigBuilder("spark.threadDumpCollector.interval")
2858+
.doc("The interval of time between two thread dump collections.")
2859+
.version("4.1.0")
2860+
.timeConf(TimeUnit.MILLISECONDS)
2861+
.checkValue(_ > 0, "Value should be positive")
2862+
.createWithDefaultString("10s")
2863+
2864+
private[spark] val THREAD_DUMP_COLLECTOR_DIR = ConfigBuilder("spark.threadDumpCollector.dir")
2865+
.doc("Set the default directory for saving the thread dump files.")
2866+
.version("4.1.0")
2867+
.stringConf
2868+
.createWithDefault("file:/tmp/spark-thread-dumps")
2869+
2870+
private[spark] val THREAD_DUMP_COLLECTOR_OUTPUT_TYPE =
2871+
ConfigBuilder("spark.threadDumpCollector.output.type")
2872+
.doc("Specifies the type of saving the thread dumps. Can be either LOG (the default) or " +
2873+
"FILE")
2874+
.version("4.1.0")
2875+
.stringConf
2876+
.transform(_.toUpperCase(Locale.ROOT))
2877+
.checkValues(Set("LOG", "FILE"))
2878+
.createWithDefault("LOG")
2879+
2880+
private[spark] val THREAD_DUMP_COLLECTOR_PATTERN =
2881+
ConfigBuilder("spark.threadDumpCollector.include.regex")
2882+
.doc("Regular expression for determining which thread dumps will be captured")
2883+
.version("4.1.0")
2884+
.stringConf
2885+
.createWithDefault(".*")
2886+
28412887
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import java.io.File
21+
import java.nio.file.Files
22+
23+
import org.apache.commons.io.FileUtils
24+
25+
import org.apache.spark.internal.config.{DRIVER_LOG_DFS_DIR, DRIVER_LOG_PERSISTTODFS,
26+
DRIVER_THREAD_DUMP_COLLECTOR_ENABLED,
27+
EXECUTOR_THREAD_DUMP_COLLECTOR_ENABLED, THREAD_DUMP_COLLECTOR_DIR,
28+
THREAD_DUMP_COLLECTOR_INTERVAL, THREAD_DUMP_COLLECTOR_OUTPUT_TYPE}
29+
import org.apache.spark.internal.config.ConfigEntry
30+
import org.apache.spark.network.util.JavaUtils
31+
import org.apache.spark.util.Utils
32+
33+
class ThreadDumpCollectorSuite extends SparkFunSuite {
34+
35+
private var tmpDir : File = _
36+
37+
override def beforeEach(): Unit = {
38+
super.beforeEach()
39+
tmpDir = Utils.createTempDir()
40+
}
41+
42+
override def afterEach(): Unit = {
43+
super.afterEach()
44+
JavaUtils.deleteRecursively(tmpDir)
45+
}
46+
47+
def writeThreadDump(confEntry: ConfigEntry[Boolean], pattern: String, outputType: String)
48+
: Unit = {
49+
val conf = new SparkConf()
50+
conf.set(confEntry, true)
51+
conf.set(THREAD_DUMP_COLLECTOR_INTERVAL, 1000L)
52+
conf.set(THREAD_DUMP_COLLECTOR_OUTPUT_TYPE, outputType)
53+
54+
if (outputType == "LOG") {
55+
conf.set(DRIVER_LOG_DFS_DIR, tmpDir.getAbsolutePath)
56+
conf.set(DRIVER_LOG_PERSISTTODFS, true)
57+
}
58+
else {
59+
conf.set(THREAD_DUMP_COLLECTOR_DIR, tmpDir.getAbsolutePath)
60+
}
61+
62+
val sc = new SparkContext("local", "ThreadDumpWriteToFileTest", conf)
63+
Thread.sleep(3000)
64+
// Run a simple spark application
65+
sc.parallelize(1 to 10).count()
66+
sc.stop()
67+
val threadDumpDir = FileUtils.getFile(tmpDir.getAbsolutePath)
68+
assert(threadDumpDir.exists())
69+
val files = threadDumpDir.listFiles().filter(file => !file.isHidden)
70+
assert(files.length >= 1)
71+
assert(files.forall { file =>
72+
Files.lines(file.toPath).anyMatch(line => line.contains(pattern))
73+
})
74+
}
75+
76+
test("Spark executor thread dumps are persisted to dfs") {
77+
writeThreadDump(EXECUTOR_THREAD_DUMP_COLLECTOR_ENABLED, "executor-heartbeater", "FILE")
78+
}
79+
80+
test("Spark driver thread dumps are persisted to dfs") {
81+
writeThreadDump(DRIVER_THREAD_DUMP_COLLECTOR_ENABLED, "driver-heartbeater", "FILE")
82+
}
83+
84+
test("Spark driver thread dumps are persisted to driver log") {
85+
writeThreadDump(DRIVER_THREAD_DUMP_COLLECTOR_ENABLED, "driver-heartbeater", "LOG")
86+
}
87+
88+
}

0 commit comments

Comments
 (0)