Skip to content

Commit

Permalink
Revert "[SPARK-36575][CORE] Should ignore task finished event if its …
Browse files Browse the repository at this point in the history
…task set is gone in TaskSchedulerImpl.handleSuccessfulTask"

This reverts commit bc80c84.
  • Loading branch information
Ngone51 committed Nov 10, 2021
1 parent b89f415 commit 16e2604
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -871,13 +871,7 @@ private[spark] class TaskSchedulerImpl(
taskSetManager: TaskSetManager,
tid: Long,
taskResult: DirectTaskResult[_]): Unit = synchronized {
if (taskIdToTaskSetManager.contains(tid)) {
taskSetManager.handleSuccessfulTask(tid, taskResult)
} else {
logInfo(s"Ignoring update with state finished for task (TID $tid) because its task set " +
"is gone (this is likely the result of receiving duplicate task finished status updates)" +
" or its executor has been marked as failed.")
}
taskSetManager.handleSuccessfulTask(tid, taskResult)
}

def handleFailedTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@
package org.apache.spark.scheduler

import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent.{CountDownLatch, ExecutorService, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.duration._
import scala.language.reflectiveCalls

import org.mockito.ArgumentMatchers.{any, anyInt, anyString, eq => meq}
import org.mockito.Mockito.{atLeast, atMost, never, spy, times, verify, when}
Expand All @@ -37,7 +34,7 @@ import org.apache.spark.internal.config
import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, TaskResourceRequests}
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.resource.TestResourceIDs._
import org.apache.spark.util.{Clock, ManualClock, ThreadUtils}
import org.apache.spark.util.{Clock, ManualClock}

class FakeSchedulerBackend extends SchedulerBackend {
def start(): Unit = {}
Expand Down Expand Up @@ -1998,87 +1995,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(!normalTSM.runningTasksSet.contains(taskId))
}

test("SPARK-36575: Should ignore task finished event if its task set is gone " +
"in TaskSchedulerImpl.handleSuccessfulTask") {
val taskScheduler = setupScheduler()

val latch = new CountDownLatch(2)
val resultGetter = new TaskResultGetter(sc.env, taskScheduler) {
override protected val getTaskResultExecutor: ExecutorService =
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable],
ThreadUtils.namedThreadFactory("task-result-getter")) {
override def execute(command: Runnable): Unit = {
super.execute(new Runnable {
override def run(): Unit = {
command.run()
latch.countDown()
}
})
}
}
def taskResultExecutor() : ExecutorService = getTaskResultExecutor
}
taskScheduler.taskResultGetter = resultGetter

val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 1),
new WorkerOffer("executor1", "host1", 1))
val task1 = new ShuffleMapTask(1, 0, null, new Partition {
override def index: Int = 0
}, Seq(TaskLocation("host0", "executor0")), new Properties, null)

val task2 = new ShuffleMapTask(1, 0, null, new Partition {
override def index: Int = 1
}, Seq(TaskLocation("host1", "executor1")), new Properties, null)

val taskSet = new TaskSet(Array(task1, task2), 0, 0, 0, null, 0)

taskScheduler.submitTasks(taskSet)
val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
assert(2 === taskDescriptions.length)

val ser = sc.env.serializer.newInstance()
val directResult = new DirectTaskResult[Int](ser.serialize(1), Seq(), Array.empty)
val resultBytes = ser.serialize(directResult)

val busyTask = new Runnable {
val lock : Object = new Object
override def run(): Unit = {
lock.synchronized {
lock.wait()
}
}
def markTaskDone: Unit = {
lock.synchronized {
lock.notify()
}
}
}
// make getTaskResultExecutor busy
resultGetter.taskResultExecutor().submit(busyTask)

// task1 finished
val tid = taskDescriptions(0).taskId
taskScheduler.statusUpdate(
tid = tid,
state = TaskState.FINISHED,
serializedData = resultBytes
)

// mark executor heartbeat timed out
taskScheduler.executorLost(taskDescriptions(0).executorId, ExecutorProcessLost("Executor " +
"heartbeat timed out"))

busyTask.markTaskDone

// Wait until all events are processed
latch.await()

val taskSetManager = taskScheduler.taskIdToTaskSetManager.get(taskDescriptions(1).taskId)
assert(taskSetManager != null)
assert(0 == taskSetManager.tasksSuccessful)
assert(!taskSetManager.successful(taskDescriptions(0).index))
}

/**
* Used by tests to simulate a task failure. This calls the failure handler explicitly, to ensure
* that all the state is updated when this method returns. Otherwise, there's no way to know when
Expand Down

0 comments on commit 16e2604

Please sign in to comment.