File tree Expand file tree Collapse file tree 1 file changed +9
-8
lines changed
core/src/main/scala/org/apache/spark/util Expand file tree Collapse file tree 1 file changed +9
-8
lines changed Original file line number Diff line number Diff line change @@ -206,7 +206,7 @@ private[spark] object ShutdownHookManager extends Logging {
206
206
private [util] class SparkShutdownHookManager {
207
207
208
208
private val hooks = new PriorityQueue [SparkShutdownHook ]()
209
- private var shuttingDown = false
209
+ @ volatile private var shuttingDown = false
210
210
211
211
/**
212
212
* Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
@@ -232,22 +232,23 @@ private [util] class SparkShutdownHookManager {
232
232
}
233
233
}
234
234
235
- def runAll (): Unit = synchronized {
235
+ def runAll (): Unit = {
236
236
shuttingDown = true
237
- while (! hooks.isEmpty()) {
238
- Try (Utils .logUncaughtExceptions(hooks.poll().run()))
237
+ var nextHook : SparkShutdownHook = null
238
+ while ({nextHook = hooks synchronized { hooks.poll() }; nextHook != null }) {
239
+ Try (Utils .logUncaughtExceptions(nextHook.run()))
239
240
}
240
241
}
241
242
242
- def add (priority : Int , hook : () => Unit ): AnyRef = synchronized {
243
+ def add (priority : Int , hook : () => Unit ): AnyRef = {
243
244
checkState()
244
245
val hookRef = new SparkShutdownHook (priority, hook)
245
- hooks.add(hookRef)
246
+ hooks synchronized { hooks .add(hookRef) }
246
247
hookRef
247
248
}
248
249
249
- def remove (ref : AnyRef ): Boolean = synchronized {
250
- hooks.remove(ref)
250
+ def remove (ref : AnyRef ): Boolean = {
251
+ hooks synchronized { hooks .remove(ref) }
251
252
}
252
253
253
254
private def checkState (): Unit = {
You can’t perform that action at this time.
0 commit comments