1717
1818package org .apache .spark .scheduler
1919
20- import java .util .Random
20+ import java .util .{ Properties , Random }
2121
2222import scala .collection .mutable
2323import scala .collection .mutable .ArrayBuffer
@@ -30,6 +30,7 @@ import org.mockito.stubbing.Answer
3030import org .apache .spark ._
3131import org .apache .spark .internal .config
3232import org .apache .spark .internal .Logging
33+ import org .apache .spark .serializer .SerializerInstance
3334import org .apache .spark .util .{AccumulatorV2 , ManualClock }
3435
3536class FakeDAGScheduler (sc : SparkContext , taskScheduler : FakeTaskScheduler )
@@ -661,6 +662,67 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
661662 assert(thrown2.getMessage().contains(" bigger than spark.driver.maxResultSize" ))
662663 }
663664
665+ test(" [SPARK-13931] taskSetManager should not send Resubmitted tasks after being a zombie" ) {
666+ val conf = new SparkConf ().set(" spark.speculation" , " true" )
667+ sc = new SparkContext (" local" , " test" , conf)
668+
669+ val sched = new FakeTaskScheduler (sc, (" execA" , " host1" ), (" execB" , " host2" ))
670+ sched.initialize(new FakeSchedulerBackend () {
671+ override def killTask (taskId : Long , executorId : String , interruptThread : Boolean ): Unit = {}
672+ })
673+
674+ // Keep track of the number of tasks that are resubmitted,
675+ // so that the test can check that no tasks were resubmitted.
676+ var resubmittedTasks = 0
677+ val dagScheduler = new FakeDAGScheduler (sc, sched) {
678+ override def taskEnded (
679+ task : Task [_],
680+ reason : TaskEndReason ,
681+ result : Any ,
682+ accumUpdates : Seq [AccumulatorV2 [_, _]],
683+ taskInfo : TaskInfo ): Unit = {
684+ super .taskEnded(task, reason, result, accumUpdates, taskInfo)
685+ reason match {
686+ case Resubmitted => resubmittedTasks += 1
687+ case _ =>
688+ }
689+ }
690+ }
691+ sched.setDAGScheduler(dagScheduler)
692+
693+ val singleTask = new ShuffleMapTask (0 , 0 , null , new Partition {
694+ override def index : Int = 0
695+ }, Seq (TaskLocation (" host1" , " execA" )), new Properties , null )
696+ val taskSet = new TaskSet (Array (singleTask), 0 , 0 , 0 , null )
697+ val manager = new TaskSetManager (sched, taskSet, MAX_TASK_FAILURES )
698+
699+ // Offer host1, which should be accepted as a PROCESS_LOCAL location
700+ // by the one task in the task set
701+ val task1 = manager.resourceOffer(" execA" , " host1" , TaskLocality .PROCESS_LOCAL ).get
702+
703+ // Mark the task as available for speculation, and then offer another resource,
704+ // which should be used to launch a speculative copy of the task.
705+ manager.speculatableTasks += singleTask.partitionId
706+ val task2 = manager.resourceOffer(" execB" , " host2" , TaskLocality .ANY ).get
707+
708+ assert(manager.runningTasks === 2 )
709+ assert(manager.isZombie === false )
710+
711+ val directTaskResult = new DirectTaskResult [String ](null , Seq ()) {
712+ override def value (resultSer : SerializerInstance ): String = " "
713+ }
714+ // Complete one copy of the task, which should result in the task set manager
715+ // being marked as a zombie, because at least one copy of its only task has completed.
716+ manager.handleSuccessfulTask(task1.taskId, directTaskResult)
717+ assert(manager.isZombie === true )
718+ assert(resubmittedTasks === 0 )
719+ assert(manager.runningTasks === 1 )
720+
721+ manager.executorLost(" execB" , " host2" , new SlaveLost ())
722+ assert(manager.runningTasks === 0 )
723+ assert(resubmittedTasks === 0 )
724+ }
725+
664726 test(" speculative and noPref task should be scheduled after node-local" ) {
665727 sc = new SparkContext (" local" , " test" )
666728 sched = new FakeTaskScheduler (
0 commit comments