Skip to content

[SPARK-11872] Prevent the call to SparkContext#stop() in the listener bus's thread #9852

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1694,6 +1694,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

// Shut down the SparkContext.
def stop() {
if (AsynchronousListenerBus.withinListenerThread.value) {
throw new SparkException("Cannot stop SparkContext within listener thread of" +
" AsynchronousListenerBus")
}
// Use the stopping variable to ensure no contention for the stop scenario.
// Still track the stopped variable for use elsewhere in the code.
if (!stopped.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._

import org.scalatest.Matchers

import org.apache.spark.SparkException
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.ResetSystemProperties
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
Expand All @@ -36,6 +37,21 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match

val jobCompletionTime = 1421191296660L

test("don't call sc.stop in listener") {
sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SparkContextStoppingListener(sc)
val bus = new LiveListenerBus
bus.addListener(listener)

// Starting listener bus should flush all buffered events
bus.start(sc)
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)

bus.stop()
assert(listener.sparkExSeen)
}

test("basic creation and shutdown of LiveListenerBus") {
val counter = new BasicJobCounter
val bus = new LiveListenerBus
Expand Down Expand Up @@ -443,6 +459,21 @@ private class BasicJobCounter extends SparkListener {
override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
}

/**
* A simple listener that tries to stop SparkContext.
*/
private class SparkContextStoppingListener(val sc: SparkContext) extends SparkListener {
@volatile var sparkExSeen = false
override def onJobEnd(job: SparkListenerJobEnd): Unit = {
try {
sc.stop()
} catch {
case se: SparkException =>
sparkExSeen = true
}
}
}

private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListener {
var count = 0
override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
Expand Down