Skip to content

Commit 8b023d4

Browse files
committed
Reasons for cancelling.
1 parent 4d8cec5 commit 8b023d4

File tree

5 files changed

+77
-11
lines changed

5 files changed

+77
-11
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2207,10 +2207,11 @@ class SparkContext(config: SparkConf) extends Logging {
22072207
* Cancel a given job if it's scheduled or running.
22082208
*
22092209
* @param jobId the job ID to cancel
2210+
* @param reason optional reason for cancellation
22102211
* @note Throws `InterruptedException` if the cancel message cannot be sent
22112212
*/
2212-
def cancelJob(jobId: Int) {
2213-
dagScheduler.cancelJob(jobId)
2213+
def cancelJob(jobId: Int, reason: String = "") {
2214+
dagScheduler.cancelJob(jobId, reason)
22142215
}
22152216

22162217
/**

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -696,9 +696,9 @@ class DAGScheduler(
696696
/**
697697
* Cancel a job that is running or waiting in the queue.
698698
*/
699-
def cancelJob(jobId: Int): Unit = {
699+
def cancelJob(jobId: Int, reason: String = ""): Unit = {
700700
logInfo("Asked to cancel job " + jobId)
701-
eventProcessLoop.post(JobCancelled(jobId))
701+
eventProcessLoop.post(JobCancelled(jobId, reason))
702702
}
703703

704704
/**
@@ -1644,8 +1644,8 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
16441644
case StageCancelled(stageId, reason) =>
16451645
dagScheduler.handleStageCancellation(stageId, reason)
16461646

1647-
case JobCancelled(jobId) =>
1648-
dagScheduler.handleJobCancellation(jobId)
1647+
case JobCancelled(jobId, reason) =>
1648+
dagScheduler.handleJobCancellation(jobId, reason)
16491649

16501650
case JobGroupCancelled(groupId) =>
16511651
dagScheduler.handleJobGroupCancelled(groupId)

core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ private[scheduler] case class MapStageSubmitted(
5555

5656
private[scheduler] case class StageCancelled(stageId: Int, reason: String) extends DAGSchedulerEvent
5757

58-
private[scheduler] case class JobCancelled(jobId: Int) extends DAGSchedulerEvent
58+
private[scheduler] case class JobCancelled(jobId: Int, reason: String) extends DAGSchedulerEvent
5959

6060
private[scheduler] case class JobGroupCancelled(groupId: String) extends DAGSchedulerEvent
6161

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,21 @@ import java.net.MalformedURLException
2222
import java.nio.charset.StandardCharsets
2323
import java.util.concurrent.TimeUnit
2424

25+
import scala.concurrent.duration._
2526
import scala.concurrent.Await
26-
import scala.concurrent.duration.Duration
2727

2828
import com.google.common.io.Files
2929
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
3030
import org.apache.hadoop.mapred.TextInputFormat
3131
import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
32+
import org.scalatest.concurrent.Eventually
3233
import org.scalatest.Matchers._
3334

34-
import org.apache.spark.scheduler.SparkListener
35+
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskStart}
3536
import org.apache.spark.util.Utils
3637

37-
class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
38+
39+
class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventually {
3840

3941
test("Only one SparkContext may be active at a time") {
4042
// Regression test for SPARK-4180
@@ -465,4 +467,67 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
465467
assert(!sc.listenerBus.listeners.contains(sparkListener1))
466468
assert(sc.listenerBus.listeners.contains(sparkListener2))
467469
}
470+
471+
test("Cancelling stages/jobs with custom reasons.") {
472+
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
473+
val REASON = "You shall not pass"
474+
475+
val listener = new SparkListener {
476+
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
477+
if (SparkContextSuite.cancelStage) {
478+
eventually(timeout(10.seconds)) {
479+
assert(SparkContextSuite.isTaskStarted)
480+
}
481+
sc.cancelStage(taskStart.stageId, REASON)
482+
SparkContextSuite.cancelStage = false
483+
}
484+
}
485+
486+
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
487+
if (SparkContextSuite.cancelJob) {
488+
eventually(timeout(10.seconds)) {
489+
assert(SparkContextSuite.isTaskStarted)
490+
}
491+
sc.cancelJob(jobStart.jobId, REASON)
492+
SparkContextSuite.cancelJob = false
493+
}
494+
}
495+
}
496+
sc.addSparkListener(listener)
497+
498+
for (cancelWhat <- Seq("stage", "job")) {
499+
SparkContextSuite.isTaskStarted = false
500+
SparkContextSuite.cancelStage = (cancelWhat == "stage")
501+
SparkContextSuite.cancelJob = (cancelWhat == "job")
502+
503+
val ex = intercept[SparkException] {
504+
sc.range(0, 10000L).map { x =>
505+
if (x == 10L) {
506+
org.apache.spark.SparkContextSuite.isTaskStarted = true
507+
}
508+
x
509+
}.cartesian(sc.range(0, 10L))count()
510+
}
511+
512+
ex.getCause() match {
513+
case null =>
514+
assert(ex.getMessage().contains(REASON))
515+
case cause: SparkException =>
516+
assert(cause.getMessage().contains(REASON))
517+
case cause: Throwable =>
518+
fail("Expected the cause to be SparkException, got " + cause.toString() + " instead.")
519+
}
520+
521+
eventually(timeout(20.seconds)) {
522+
assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
523+
}
524+
}
525+
}
526+
527+
}
528+
529+
object SparkContextSuite {
530+
@volatile var cancelJob = false
531+
@volatile var cancelStage = false
532+
@volatile var isTaskStarted = false
468533
}

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
329329

330330
/** Sends JobCancelled to the DAG scheduler. */
331331
private def cancel(jobId: Int) {
332-
runEvent(JobCancelled(jobId))
332+
runEvent(JobCancelled(jobId, ""))
333333
}
334334

335335
test("[SPARK-3353] parent stage should have lower stage id") {

0 commit comments

Comments
 (0)