Skip to content

Commit 58e2b3f

Browse files
Hung LinJoshRosen
Hung Lin
authored andcommitted
SPARK-6414: Spark driver failed with NPE on job cancelation
Use Option for ActiveJob.properties to avoid NPE bug Author: Hung Lin <hung.lin@gmail.com> Closes #5124 from hunglin/SPARK-6414 and squashes the following commits: 2290b6b [Hung Lin] [SPARK-6414][core] Fix NPE in SparkContext.cancelJobGroup() (cherry picked from commit e3202aa) Signed-off-by: Josh Rosen <joshrosen@databricks.com> Conflicts: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
1 parent a6664dc commit 58e2b3f

File tree

3 files changed

+25
-14
lines changed

3 files changed

+25
-14
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
433433
// Thread Local variable that can be used by users to pass information down the stack
434434
private val localProperties = new InheritableThreadLocal[Properties] {
435435
override protected def childValue(parent: Properties): Properties = new Properties(parent)
436+
override protected def initialValue(): Properties = new Properties()
436437
}
437438

438439
/**
@@ -474,9 +475,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
474475
* Spark fair scheduler pool.
475476
*/
476477
def setLocalProperty(key: String, value: String) {
477-
if (localProperties.get() == null) {
478-
localProperties.set(new Properties())
479-
}
480478
if (value == null) {
481479
localProperties.get.remove(key)
482480
} else {

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -474,8 +474,7 @@ class DAGScheduler(
474474
callSite: CallSite,
475475
allowLocal: Boolean,
476476
resultHandler: (Int, U) => Unit,
477-
properties: Properties = null): JobWaiter[U] =
478-
{
477+
properties: Properties): JobWaiter[U] = {
479478
// Check to make sure we are not launching a task on a partition that does not exist.
480479
val maxPartitions = rdd.partitions.length
481480
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
@@ -504,8 +503,7 @@ class DAGScheduler(
504503
callSite: CallSite,
505504
allowLocal: Boolean,
506505
resultHandler: (Int, U) => Unit,
507-
properties: Properties = null)
508-
{
506+
properties: Properties): Unit = {
509507
val start = System.nanoTime
510508
val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
511509
waiter.awaitResult() match {
@@ -526,9 +524,7 @@ class DAGScheduler(
526524
evaluator: ApproximateEvaluator[U, R],
527525
callSite: CallSite,
528526
timeout: Long,
529-
properties: Properties = null)
530-
: PartialResult[R] =
531-
{
527+
properties: Properties): PartialResult[R] = {
532528
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
533529
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
534530
val partitions = (0 until rdd.partitions.size).toArray
@@ -675,7 +671,7 @@ class DAGScheduler(
675671
// Cancel all jobs belonging to this job group.
676672
// First finds all active jobs with this group id, and then kill stages for them.
677673
val activeInGroup = activeJobs.filter(activeJob =>
678-
groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
674+
Option(activeJob.properties).exists(_.get(SparkContext.SPARK_JOB_GROUP_ID) == groupId))
679675
val jobIds = activeInGroup.map(_.jobId)
680676
jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId)))
681677
submitWaitingStages()
@@ -722,8 +718,7 @@ class DAGScheduler(
722718
allowLocal: Boolean,
723719
callSite: CallSite,
724720
listener: JobListener,
725-
properties: Properties = null)
726-
{
721+
properties: Properties) {
727722
var finalStage: Stage = null
728723
try {
729724
// New stage creation may throw an exception if, for example, jobs are run on a

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,19 @@
1818
package org.apache.spark
1919

2020
import java.io.File
21+
import java.util.concurrent.TimeUnit
2122

2223
import com.google.common.base.Charsets._
2324
import com.google.common.io.Files
2425

2526
import org.scalatest.FunSuite
2627

2728
import org.apache.hadoop.io.BytesWritable
28-
2929
import org.apache.spark.util.Utils
3030

31+
import scala.concurrent.Await
32+
import scala.concurrent.duration.Duration
33+
3134
class SparkContextSuite extends FunSuite with LocalSparkContext {
3235

3336
test("Only one SparkContext may be active at a time") {
@@ -172,4 +175,19 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
172175
sc.stop()
173176
}
174177
}
178+
179+
test("Cancelling job group should not cause SparkContext to shutdown (SPARK-6414)") {
180+
try {
181+
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
182+
val future = sc.parallelize(Seq(0)).foreachAsync(_ => {Thread.sleep(1000L)})
183+
sc.cancelJobGroup("nonExistGroupId")
184+
Await.ready(future, Duration(2, TimeUnit.SECONDS))
185+
186+
// In SPARK-6414, sc.cancelJobGroup will cause NullPointerException and cause
187+
// SparkContext to shutdown, so the following assertion will fail.
188+
assert(sc.parallelize(1 to 10).count() == 10L)
189+
} finally {
190+
sc.stop()
191+
}
192+
}
175193
}

0 commit comments

Comments
 (0)