Skip to content

Commit 8fa09a4

Browse files
Hung LinJoshRosen
Hung Lin
authored andcommitted
SPARK-6414: Spark driver failed with NPE on job cancelation
Use Option for ActiveJob.properties to avoid NPE bug Author: Hung Lin <hung.lin@gmail.com> Closes #5124 from hunglin/SPARK-6414 and squashes the following commits: 2290b6b [Hung Lin] [SPARK-6414][core] Fix NPE in SparkContext.cancelJobGroup() (cherry picked from commit e3202aa) Signed-off-by: Josh Rosen <joshrosen@databricks.com> Conflicts: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala Conflicts: core/src/test/scala/org/apache/spark/SparkContextSuite.scala
1 parent a73055f commit 8fa09a4

File tree

3 files changed

+33
-13
lines changed

3 files changed

+33
-13
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
400400
// Thread Local variable that can be used by users to pass information down the stack
401401
private val localProperties = new InheritableThreadLocal[Properties] {
402402
override protected def childValue(parent: Properties): Properties = new Properties(parent)
403+
override protected def initialValue(): Properties = new Properties()
403404
}
404405

405406
/**
@@ -441,9 +442,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
441442
* Spark fair scheduler pool.
442443
*/
443444
def setLocalProperty(key: String, value: String) {
444-
if (localProperties.get() == null) {
445-
localProperties.set(new Properties())
446-
}
447445
if (value == null) {
448446
localProperties.get.remove(key)
449447
} else {

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -477,8 +477,7 @@ class DAGScheduler(
477477
callSite: CallSite,
478478
allowLocal: Boolean,
479479
resultHandler: (Int, U) => Unit,
480-
properties: Properties = null): JobWaiter[U] =
481-
{
480+
properties: Properties): JobWaiter[U] = {
482481
// Check to make sure we are not launching a task on a partition that does not exist.
483482
val maxPartitions = rdd.partitions.length
484483
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
@@ -507,8 +506,7 @@ class DAGScheduler(
507506
callSite: CallSite,
508507
allowLocal: Boolean,
509508
resultHandler: (Int, U) => Unit,
510-
properties: Properties = null)
511-
{
509+
properties: Properties): Unit = {
512510
val start = System.nanoTime
513511
val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
514512
waiter.awaitResult() match {
@@ -529,9 +527,7 @@ class DAGScheduler(
529527
evaluator: ApproximateEvaluator[U, R],
530528
callSite: CallSite,
531529
timeout: Long,
532-
properties: Properties = null)
533-
: PartialResult[R] =
534-
{
530+
properties: Properties): PartialResult[R] = {
535531
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
536532
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
537533
val partitions = (0 until rdd.partitions.size).toArray
@@ -678,7 +674,7 @@ class DAGScheduler(
678674
// Cancel all jobs belonging to this job group.
679675
// First finds all active jobs with this group id, and then kill stages for them.
680676
val activeInGroup = activeJobs.filter(activeJob =>
681-
groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
677+
Option(activeJob.properties).exists(_.get(SparkContext.SPARK_JOB_GROUP_ID) == groupId))
682678
val jobIds = activeInGroup.map(_.jobId)
683679
jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId)))
684680
submitWaitingStages()
@@ -725,8 +721,7 @@ class DAGScheduler(
725721
allowLocal: Boolean,
726722
callSite: CallSite,
727723
listener: JobListener,
728-
properties: Properties = null)
729-
{
724+
properties: Properties) {
730725
var finalStage: Stage = null
731726
try {
732727
// New stage creation may throw an exception if, for example, jobs are run on a

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,22 @@
1717

1818
package org.apache.spark
1919

20+
import java.io.File
21+
import java.util.concurrent.TimeUnit
22+
23+
import com.google.common.base.Charsets._
24+
import com.google.common.io.Files
25+
2026
import org.scalatest.FunSuite
2127

2228
import org.apache.hadoop.io.BytesWritable
2329

30+
import org.apache.spark.util.Utils
31+
import org.apache.spark.SparkContext._
32+
33+
import scala.concurrent.Await
34+
import scala.concurrent.duration.Duration
35+
2436
class SparkContextSuite extends FunSuite with LocalSparkContext {
2537

2638
test("Only one SparkContext may be active at a time") {
@@ -72,4 +84,19 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
7284
val byteArray2 = converter.convert(bytesWritable)
7385
assert(byteArray2.length === 0)
7486
}
87+
88+
test("Cancelling job group should not cause SparkContext to shutdown (SPARK-6414)") {
89+
try {
90+
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
91+
val future = sc.parallelize(Seq(0)).foreachAsync(_ => {Thread.sleep(1000L)})
92+
sc.cancelJobGroup("nonExistGroupId")
93+
Await.ready(future, Duration(2, TimeUnit.SECONDS))
94+
95+
// In SPARK-6414, sc.cancelJobGroup will cause NullPointerException and cause
96+
// SparkContext to shutdown, so the following assertion will fail.
97+
assert(sc.parallelize(1 to 10).count() == 10L)
98+
} finally {
99+
sc.stop()
100+
}
101+
}
75102
}

0 commit comments

Comments
 (0)