Skip to content

SPARK-6414: Spark driver failed with NPE on job cancelation #5124

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Thread Local variable that can be used by users to pass information down the stack
private val localProperties = new InheritableThreadLocal[Properties] {
override protected def childValue(parent: Properties): Properties = new Properties(parent)
override protected def initialValue(): Properties = new Properties()
}

/**
Expand Down Expand Up @@ -474,9 +475,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Spark fair scheduler pool.
*/
def setLocalProperty(key: String, value: String) {
if (localProperties.get() == null) {
localProperties.set(new Properties())
}
if (value == null) {
localProperties.get.remove(key)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ class DAGScheduler(
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties = null): JobWaiter[U] = {
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
Expand Down Expand Up @@ -522,7 +522,7 @@ class DAGScheduler(
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties = null): Unit = {
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
waiter.awaitResult() match {
Expand All @@ -542,7 +542,7 @@ class DAGScheduler(
evaluator: ApproximateEvaluator[U, R],
callSite: CallSite,
timeout: Long,
properties: Properties = null): PartialResult[R] = {
properties: Properties): PartialResult[R] = {
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val partitions = (0 until rdd.partitions.size).toArray
Expand Down Expand Up @@ -689,7 +689,7 @@ class DAGScheduler(
// Cancel all jobs belonging to this job group.
// First finds all active jobs with this group id, and then kill stages for them.
val activeInGroup = activeJobs.filter(activeJob =>
groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
Option(activeJob.properties).exists(_.get(SparkContext.SPARK_JOB_GROUP_ID) == groupId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we've changed initialValue, won't activeJob.properties always be non-null now? I think it should now be safe to use the old code, unless I'm overlooking something.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right, for the current codes, activeJob.properties will always be non-null. But the codes may change in the future, and the above line make sure there won't be NPE even if activeJob.properties is null.

val jobIds = activeInGroup.map(_.jobId)
jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId)))
submitWaitingStages()
Expand Down Expand Up @@ -736,7 +736,7 @@ class DAGScheduler(
allowLocal: Boolean,
callSite: CallSite,
listener: JobListener,
properties: Properties = null) {
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
Expand Down
20 changes: 19 additions & 1 deletion core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
package org.apache.spark

import java.io.File
import java.util.concurrent.TimeUnit

import com.google.common.base.Charsets._
import com.google.common.io.Files

import org.scalatest.FunSuite

import org.apache.hadoop.io.BytesWritable

import org.apache.spark.util.Utils

import scala.concurrent.Await
import scala.concurrent.duration.Duration

class SparkContextSuite extends FunSuite with LocalSparkContext {

test("Only one SparkContext may be active at a time") {
Expand Down Expand Up @@ -173,4 +176,19 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
sc.stop()
}
}

test("Cancelling job group should not cause SparkContext to shutdown (SPARK-6414)") {
try {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
val future = sc.parallelize(Seq(0)).foreachAsync(_ => {Thread.sleep(1000L)})
sc.cancelJobGroup("nonExistGroupId")
Await.ready(future, Duration(2, TimeUnit.SECONDS))

// In SPARK-6414, sc.cancelJobGroup will cause NullPointerException and cause
// SparkContext to shutdown, so the following assertion will fail.
assert(sc.parallelize(1 to 10).count() == 10L)
} finally {
sc.stop()
}
}
}