Skip to content

[SPARK-6629] cancelJobGroup() may not work for jobs whose job groups are inherited from parent threads #5288

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import scala.language.existentials
import scala.language.postfixOps
import scala.util.control.NonFatal

import org.apache.commons.lang3.SerializationUtils

import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.executor.TaskMetrics
Expand Down Expand Up @@ -509,7 +511,8 @@ class DAGScheduler(
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter,
SerializationUtils.clone(properties)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to use SerializationUtils.clone() in order to perform a deep-clone which also captures properties that were inherited from parents; see 5a55261 for more discussion of this issue.

waiter
}

Expand Down Expand Up @@ -546,7 +549,8 @@ class DAGScheduler(
val partitions = (0 until rdd.partitions.size).toArray
val jobId = nextJobId.getAndIncrement()
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, properties))
jobId, rdd, func2, partitions, allowLocal = false, callSite, listener,
SerializationUtils.clone(properties)))
listener.awaitResult() // Will throw an exception if the job fails
}

Expand Down Expand Up @@ -686,8 +690,11 @@ class DAGScheduler(
private[scheduler] def handleJobGroupCancelled(groupId: String) {
// Cancel all jobs belonging to this job group.
// First finds all active jobs with this group id, and then kill stages for them.
val activeInGroup = activeJobs.filter(activeJob =>
Option(activeJob.properties).exists(_.get(SparkContext.SPARK_JOB_GROUP_ID) == groupId))
val activeInGroup = activeJobs.filter { activeJob =>
Option(activeJob.properties).exists {
_.getProperty(SparkContext.SPARK_JOB_GROUP_ID) == groupId
}
}
val jobIds = activeInGroup.map(_.jobId)
jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId)))
submitWaitingStages()
Expand Down
35 changes: 35 additions & 0 deletions core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,41 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
assert(jobB.get() === 100)
}

test("inherited job group (SPARK-6629)") {
sc = new SparkContext("local[2]", "test")

// Add a listener to release the semaphore once any tasks are launched.
val sem = new Semaphore(0)
sc.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart) {
sem.release()
}
})

sc.setJobGroup("jobA", "this is a job to be cancelled")
@volatile var exception: Exception = null
val jobA = new Thread() {
// The job group should be inherited by this thread
override def run(): Unit = {
exception = intercept[SparkException] {
sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
}
}
}
jobA.start()

// Block until both tasks of job A have started and cancel job A.
sem.acquire(2)
sc.cancelJobGroup("jobA")
jobA.join(10000)
assert(!jobA.isAlive)
assert(exception.getMessage contains "cancel")

// Once A is cancelled, job B should finish fairly quickly.
val jobB = sc.parallelize(1 to 100, 2).countAsync()
assert(jobB.get() === 100)
}

test("job group with interruption") {
sc = new SparkContext("local[2]", "test")

Expand Down
46 changes: 45 additions & 1 deletion core/src/test/scala/org/apache/spark/ThreadingSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark

import java.util.concurrent.Semaphore
import java.util.concurrent.{TimeUnit, Semaphore}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.scheduler._
import org.scalatest.FunSuite

/**
Expand Down Expand Up @@ -189,4 +190,47 @@ class ThreadingSuite extends FunSuite with LocalSparkContext {
assert(sc.getLocalProperty("test") === "parent")
assert(sc.getLocalProperty("Foo") === null)
}

test("mutations to local properties should not affect submitted jobs (SPARK-6629)") {
val jobStarted = new Semaphore(0)
val jobEnded = new Semaphore(0)
@volatile var jobResult: JobResult = null

sc = new SparkContext("local", "test")
sc.setJobGroup("originalJobGroupId", "description")
sc.addSparkListener(new SparkListener {
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
jobStarted.release()
}
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
jobResult = jobEnd.jobResult
jobEnded.release()
}
})

// Create a new thread which will inherit the current thread's properties
val thread = new Thread() {
override def run(): Unit = {
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "originalJobGroupId")
// Sleeps for a total of 10 seconds, but allows cancellation to interrupt the task
try {
sc.parallelize(1 to 100).foreach { x =>
Thread.sleep(100)
}
} catch {
case s: SparkException => // ignored so that we don't print noise in test logs
}
}
}
thread.start()
// Wait for the job to start, then mutate the original properties, which should have been
// inherited by the running job but hopefully defensively copied or snapshotted:
jobStarted.tryAcquire(10, TimeUnit.SECONDS)
sc.setJobGroup("modifiedJobGroupId", "description")
// Canceling the original job group should cancel the running job. In other words, the
// modification of the properties object should not affect the properties of running jobs
sc.cancelJobGroup("originalJobGroupId")
jobEnded.tryAcquire(10, TimeUnit.SECONDS)
assert(jobResult.isInstanceOf[JobFailed])
}
}