Skip to content

[SPARK-30359][CORE] Don't clear executorsPendingToRemove at the beginning of CoarseGrainedSchedulerBackend.reset #27017

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

// Executors we have requested the cluster manager to kill that have not died yet; maps
// the executor ID to whether it was explicitly killed by the driver (and thus shouldn't
// be considered an app-related failure).
// be considered an app-related failure). Visible for testing only.
@GuardedBy("CoarseGrainedSchedulerBackend.this")
private val executorsPendingToRemove = new HashMap[String, Boolean]
private[scheduler] val executorsPendingToRemove = new HashMap[String, Boolean]

// A map to store hostname with its possible task number running on it
@GuardedBy("CoarseGrainedSchedulerBackend.this")
Expand Down Expand Up @@ -492,12 +492,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Reset the state of CoarseGrainedSchedulerBackend to the initial state. Currently it will only
* be called in the yarn-client mode when AM re-registers after a failure.
* Visible for testing only.
* */
protected def reset(): Unit = {
protected[scheduler] def reset(): Unit = {
val executors: Set[String] = synchronized {
requestedTotalExecutors = 0
numPendingExecutors = 0
executorsPendingToRemove.clear()
executorDataMap.keys.toSet
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,22 @@ import java.util.{Properties, Random}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._

import org.apache.hadoop.fs.FileAlreadyExistsException
import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyString}
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.scalatest.Assertions._
import org.scalatest.PrivateMethodTester
import org.scalatest.concurrent.Eventually

import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.resource.TestResourceIDs._
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{AccumulatorV2, ManualClock}
Expand Down Expand Up @@ -179,7 +183,12 @@ class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0, 0) {
override def preferredLocations: Seq[TaskLocation] = Seq[TaskLocation]()
}

class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logging {
class TaskSetManagerSuite
extends SparkFunSuite
with LocalSparkContext
with PrivateMethodTester
with Eventually
with Logging {
import TaskLocality.{ANY, PROCESS_LOCAL, NO_PREF, NODE_LOCAL, RACK_LOCAL}

private val conf = new SparkConf
Expand Down Expand Up @@ -1894,4 +1903,58 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
manager.handleFailedTask(offerResult.get.taskId, TaskState.FAILED, reason)
assert(sched.taskSetsFailed.contains(taskSet.id))
}

test("SPARK-30359: don't clean executorsPendingToRemove " +
"at the beginning of CoarseGrainedSchedulerBackend.reset") {
val conf = new SparkConf()
// use local-cluster mode in order to get CoarseGrainedSchedulerBackend
.setMaster("local-cluster[2, 1, 2048]")
// allow to set up at most two executors
.set("spark.cores.max", "2")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we still need this config?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to create at most 2 executors at the beginning...Though, this may not necessary..

.setAppName("CoarseGrainedSchedulerBackend.reset")
sc = new SparkContext(conf)
val sched = sc.taskScheduler
val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]

TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
val Seq(exec0, exec1) = backend.getExecutorIds()

val taskSet = FakeTask.createTaskSet(2)
val stageId = taskSet.stageId
val stageAttemptId = taskSet.stageAttemptId
sched.submitTasks(taskSet)
val taskSetManagers = PrivateMethod[mutable.HashMap[Int, mutable.HashMap[Int, TaskSetManager]]](
Symbol("taskSetsByStageIdAndAttempt"))
// get the TaskSetManager
val manager = sched.invokePrivate(taskSetManagers()).get(stageId).get(stageAttemptId)

val task0 = manager.resourceOffer(exec0, "localhost", TaskLocality.NO_PREF)
val task1 = manager.resourceOffer(exec1, "localhost", TaskLocality.NO_PREF)
assert(task0.isDefined && task1.isDefined)
val (taskId0, index0) = (task0.get.taskId, task0.get.index)
val (taskId1, index1) = (task1.get.taskId, task1.get.index)
// set up two running tasks
assert(manager.taskInfos(taskId0).running)
assert(manager.taskInfos(taskId0).executorId === exec0)
assert(manager.taskInfos(taskId1).running)
assert(manager.taskInfos(taskId1).executorId === exec1)

val numFailures = PrivateMethod[Array[Int]](Symbol("numFailures"))
// no task failures yet
assert(manager.invokePrivate(numFailures())(index0) === 0)
assert(manager.invokePrivate(numFailures())(index1) === 0)

// let exec1 count task failures but exec0 doesn't
backend.executorsPendingToRemove(exec0) = true
backend.executorsPendingToRemove(exec1) = false

backend.reset()

eventually(timeout(10.seconds), interval(100.milliseconds)) {
// executorsPendingToRemove should eventually be empty after reset()
assert(backend.executorsPendingToRemove.isEmpty)
assert(manager.invokePrivate(numFailures())(index0) === 0)
assert(manager.invokePrivate(numFailures())(index1) === 1)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private[spark] abstract class YarnSchedulerBackend(
* and re-registered itself to driver after a failure. The stale state in driver should be
* cleaned.
*/
override protected def reset(): Unit = {
override protected[scheduler] def reset(): Unit = {
super.reset()
sc.executorAllocationManager.foreach(_.reset())
}
Expand Down