Skip to content

Commit 5942765

Browse files
committed
Enclosed shutdownCallback in SparkDeploySchedulerBackend by synchronized block
1 parent fd41eb9 commit 5942765

File tree

2 files changed

+21
-11
lines changed

2 files changed

+21
-11
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1902,7 +1902,7 @@ object SparkContext extends Logging {
19021902
val masterUrls = localCluster.start()
19031903
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
19041904
scheduler.initialize(backend)
1905-
backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
1905+
backend.setShutdownCallback { (backend: SparkDeploySchedulerBackend) =>
19061906
localCluster.stop()
19071907
}
19081908
(backend, scheduler)

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,17 @@ private[spark] class SparkDeploySchedulerBackend(
3131
with AppClientListener
3232
with Logging {
3333

34-
var client: AppClient = null
35-
var stopping = false
36-
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
37-
@volatile var appId: String = _
34+
private var client: AppClient = null
35+
private var stopping = false
36+
private val shutdownCallbackLock = new Object()
37+
private var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
38+
@volatile private var appId: String = _
3839

39-
val registrationLock = new Object()
40-
var registrationDone = false
40+
private val registrationLock = new Object()
41+
private var registrationDone = false
4142

42-
val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
43-
val totalExpectedCores = maxCores.getOrElse(0)
43+
private val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
44+
private val totalExpectedCores = maxCores.getOrElse(0)
4445

4546
override def start() {
4647
super.start()
@@ -82,8 +83,11 @@ private[spark] class SparkDeploySchedulerBackend(
8283
stopping = true
8384
super.stop()
8485
client.stop()
85-
if (shutdownCallback != null) {
86-
shutdownCallback(this)
86+
87+
shutdownCallbackLock.synchronized {
88+
if (shutdownCallback != null) {
89+
shutdownCallback(this)
90+
}
8791
}
8892
}
8993

@@ -135,6 +139,12 @@ private[spark] class SparkDeploySchedulerBackend(
135139
super.applicationId
136140
}
137141

142+
def setShutdownCallback(f: SparkDeploySchedulerBackend => Unit) {
143+
shutdownCallbackLock.synchronized {
144+
shutdownCallback = f
145+
}
146+
}
147+
138148
private def waitForRegistration() = {
139149
registrationLock.synchronized {
140150
while (!registrationDone) {

0 commit comments

Comments
 (0)