Skip to content

[SPARK-19326] Speculated task attempts do not get launched in few scenarios #18492

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9d4886e
add speculative job for executor calculation
janewangfb Jun 29, 2017
d4dd56c
Merge branch 'master' into speculated_task_not_launched
janewangfb Jun 29, 2017
210ba70
Add Unittests
janewangfb Jun 30, 2017
cb28404
remove debug lines
janewangfb Jun 30, 2017
41170df
remove debug
janewangfb Jun 30, 2017
9784b4e
remove debug lines
janewangfb Jun 30, 2017
5331ab9
Merge branch 'master' into speculated_task_not_launched
janewangfb Jun 30, 2017
32d0c23
Merge branch 'master' into speculated_task_not_launched
janewangfb Jul 6, 2017
92c4d09
Merge branch 'master' into speculated_task_not_launched
janewangfb Jul 13, 2017
33f5757
Added extraExecutorNeeded event
janewangfb Jul 13, 2017
a1fb8d3
Merge branch 'master' into speculated_task_not_launched
janewangfb Jul 19, 2017
0fa5c01
address jiangxb1987's comments
janewangfb Jul 19, 2017
94c4db6
Merge branch 'master' into speculated_task_not_launched
janewangfb Aug 8, 2017
37cf760
address cloud-fan's comment
janewangfb Aug 8, 2017
77b4729
address cloud-fan's comment
janewangfb Aug 8, 2017
c7c16a6
Merge branch 'master' into speculated_task_not_launched
janewangfb Aug 15, 2017
fa31cd8
remove extraExecutorNeeded event
janewangfb Aug 16, 2017
f7cdad9
Merge branch 'master' into speculated_task_not_launched
janewangfb Aug 16, 2017
8b8b128
remove SparkListenerExtraExecutorNeeded
janewangfb Aug 16, 2017
35cf6a5
Merge branch 'master' into speculated_task_not_launched
janewangfb Aug 17, 2017
7a8ca2a
Address cloud-fan's comment
janewangfb Aug 17, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
onEvent(blockUpdated);
}

@Override
public void onSpeculativeTaskSubmitted(SparkListenerSpeculativeTaskSubmitted speculativeTask) {
onEvent(speculativeTask);
}

@Override
public void onOtherEvent(SparkListenerEvent event) {
onEvent(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,14 @@ private[spark] class ExecutorAllocationManager(
// If our target has not changed, do not send a message
// to the cluster manager and reset our exponential growth
if (delta == 0) {
numExecutorsToAdd = 1
return 0
// Check if there is any speculative jobs pending
if (listener.pendingTasks == 0 && listener.pendingSpeculativeTasks > 0) {
numExecutorsTarget =
math.max(math.min(maxNumExecutorsNeeded + 1, maxNumExecutors), minNumExecutors)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@janewangfb Would you please explain why here + 1 if there's pending speculativeTasks, should the number of executors be calculated based on the number of pending tasks? Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.
maxNumExecutorsNeeded + 1 doesn't quite make sense.
@janewangfb could you please post/update some comments here? And I wonder why we didn't take pendingSpeculativeTasks into account when calculating maxNumExecutorsNeeded()

Or @jerryshao, do you know the rationale?

Copy link
Member

@Ngone51 Ngone51 Sep 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also confused by +1 here. And I think we have already take pendingSpeculativeTasks into account @advancedxy :

def totalPendingTasks(): Int = {
  pendingTasks + pendingSpeculativeTasks
}

Seems this check is redundant.
And it doesn't sync to CM if numExecutorsTarget change(after +1).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@janewangfb @cloud-fan Sorry, I realize this is very old PR but found it because I was confused by this logic as well, is there a reason we are adding 1 here with speculative?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't remember the details as it's too old. But when I look at it again now, this looks a mistake to me: the +1 seems to try to match the numExecutorsToAdd = 1 in the previous code. However, numExecutorsToAdd = 1 doesn't mean we want to allocate one more executor right now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} else {
numExecutorsToAdd = 1
return 0
}
}

val addRequestAcknowledged = try {
Expand Down Expand Up @@ -588,17 +594,22 @@ private[spark] class ExecutorAllocationManager(
* A listener that notifies the given allocation manager of when to add and remove executors.
*
* This class is intentionally conservative in its assumptions about the relative ordering
* and consistency of events returned by the listener. For simplicity, it does not account
* for speculated tasks.
* and consistency of events returned by the listener.
*/
private class ExecutorAllocationListener extends SparkListener {

private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
// Number of tasks currently running on the cluster. Should be 0 when no stages are active.
// Number of tasks currently running on the cluster including speculative tasks.
// Should be 0 when no stages are active.
private var numRunningTasks: Int = _

// Number of speculative tasks to be scheduled in each stage
private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, Int]
// The speculative tasks started in each stage
private val stageIdToSpeculativeTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]

// stageId to tuple (the number of task with locality preferences, a map where each pair is a
// node and the number of tasks that would like to be scheduled on that node) map,
// maintain the executor placement hints for each stage Id used by resource framework to better
Expand Down Expand Up @@ -637,15 +648,17 @@ private[spark] class ExecutorAllocationManager(
val stageId = stageCompleted.stageInfo.stageId
allocationManager.synchronized {
stageIdToNumTasks -= stageId
stageIdToNumSpeculativeTasks -= stageId
stageIdToTaskIndices -= stageId
stageIdToSpeculativeTaskIndices -= stageId
stageIdToExecutorPlacementHints -= stageId

// Update the executor placement hints
updateExecutorPlacementHints()

// If this is the last stage with pending tasks, mark the scheduler queue as empty
// This is needed in case the stage is aborted for any reason
if (stageIdToNumTasks.isEmpty) {
if (stageIdToNumTasks.isEmpty && stageIdToNumSpeculativeTasks.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
if (numRunningTasks != 0) {
logWarning("No stages are running, but numRunningTasks != 0")
Expand All @@ -671,7 +684,12 @@ private[spark] class ExecutorAllocationManager(
}

// If this is the last pending task, mark the scheduler queue as empty
stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex
if (taskStart.taskInfo.speculative) {
stageIdToSpeculativeTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) +=
taskIndex
} else {
stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex
}
if (totalPendingTasks() == 0) {
allocationManager.onSchedulerQueueEmpty()
}
Expand Down Expand Up @@ -705,7 +723,11 @@ private[spark] class ExecutorAllocationManager(
if (totalPendingTasks() == 0) {
allocationManager.onSchedulerBacklogged()
}
stageIdToTaskIndices.get(stageId).foreach { _.remove(taskIndex) }
if (taskEnd.taskInfo.speculative) {
stageIdToSpeculativeTaskIndices.get(stageId).foreach {_.remove(taskIndex)}
} else {
stageIdToTaskIndices.get(stageId).foreach {_.remove(taskIndex)}
}
}
}
}
Expand All @@ -726,18 +748,39 @@ private[spark] class ExecutorAllocationManager(
allocationManager.onExecutorRemoved(executorRemoved.executorId)
}

override def onSpeculativeTaskSubmitted(speculativeTask: SparkListenerSpeculativeTaskSubmitted)
: Unit = {
val stageId = speculativeTask.stageId

allocationManager.synchronized {
stageIdToNumSpeculativeTasks(stageId) =
stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
allocationManager.onSchedulerBacklogged()
}
}

/**
* An estimate of the total number of pending tasks remaining for currently running stages. Does
* not account for tasks which may have failed and been resubmitted.
*
* Note: This is not thread-safe without the caller owning the `allocationManager` lock.
*/
def totalPendingTasks(): Int = {
def pendingTasks(): Int = {
stageIdToNumTasks.map { case (stageId, numTasks) =>
numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0)
}.sum
}

def pendingSpeculativeTasks(): Int = {
stageIdToNumSpeculativeTasks.map { case (stageId, numTasks) =>
numTasks - stageIdToSpeculativeTaskIndices.get(stageId).map(_.size).getOrElse(0)
}.sum
}

def totalPendingTasks(): Int = {
pendingTasks + pendingSpeculativeTasks
}

/**
* The number of tasks currently running across all stages.
*/
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,13 @@ class DAGScheduler(
eventProcessLoop.post(TaskSetFailed(taskSet, reason, exception))
}

/**
* Called by the TaskSetManager when it decides a speculative task is needed.
*/
def speculativeTaskSubmitted(task: Task[_]): Unit = {
eventProcessLoop.post(SpeculativeTaskSubmitted(task))
}

private[scheduler]
def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized {
// Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times
Expand Down Expand Up @@ -812,6 +819,10 @@ class DAGScheduler(
listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
}

private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = {
listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId))
}

private[scheduler] def handleTaskSetFailed(
taskSet: TaskSet,
reason: String,
Expand Down Expand Up @@ -1778,6 +1789,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)

case SpeculativeTaskSubmitted(task) =>
dagScheduler.handleSpeculativeTaskSubmitted(task)

case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,7 @@ case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Thr
extends DAGSchedulerEvent

private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent

private[scheduler]
case class SpeculativeTaskSubmitted(task: Task[_]) extends DAGSchedulerEvent

11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: T
@DeveloperApi
case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerTaskEnd(
stageId: Int,
Expand Down Expand Up @@ -290,6 +293,11 @@ private[spark] trait SparkListenerInterface {
*/
def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit

/**
* Called when a speculative task is submitted
*/
def onSpeculativeTaskSubmitted(speculativeTask: SparkListenerSpeculativeTaskSubmitted): Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For normal task, we have onTaskStart, onTaskEnd, etc. but don't have onTaskSubmitted. Shall we make the name consistent for speculative task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep the name onSpeculativeTaskSubmitted. Because when the event happens, it only submit a speculative task to be launched in the future, the task has not started yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i see. So we don't track the submission of normal tasks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I grepped, and dont think we have events related to normal task submitted.


/**
* Called when other events like SQL-specific events are posted.
*/
Expand Down Expand Up @@ -354,5 +362,8 @@ abstract class SparkListener extends SparkListenerInterface {

override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }

override def onSpeculativeTaskSubmitted(
speculativeTask: SparkListenerSpeculativeTaskSubmitted): Unit = { }

override def onOtherEvent(event: SparkListenerEvent): Unit = { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ private[spark] trait SparkListenerBus
listener.onNodeUnblacklisted(nodeUnblacklisted)
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
case _ => listener.onOtherEvent(event)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,7 @@ private[spark] class TaskSetManager(
"Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms"
.format(index, taskSet.id, info.host, threshold))
speculatableTasks += index
sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
foundTasks = true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,40 @@ class ExecutorAllocationManagerSuite
assert(numExecutorsTarget(manager) === 10)
}

test("add executors when speculative tasks added") {
sc = createSparkContext(0, 10, 0)
val manager = sc.executorAllocationManager.get

// Verify that we're capped at number of tasks including the speculative ones in the stage
sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it a possible case? the first event is speculative task submitted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is not likely. Speculative job is only submitted when certain percentage of jobs have finished successfully.

assert(numExecutorsTarget(manager) === 0)
assert(numExecutorsToAdd(manager) === 1)
assert(addExecutors(manager) === 1)
sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the stage submitted event is posted after speculative task submitted event?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In real life, it is possible that a job has multiple stages, one stage is still running some tasks but the next stage starts already. This test tries to micmic.

assert(numExecutorsTarget(manager) === 1)
assert(numExecutorsToAdd(manager) === 2)
assert(addExecutors(manager) === 2)
assert(numExecutorsTarget(manager) === 3)
assert(numExecutorsToAdd(manager) === 4)
assert(addExecutors(manager) === 2)
assert(numExecutorsTarget(manager) === 5)
assert(numExecutorsToAdd(manager) === 1)

// Verify that running a task doesn't affect the target
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain more about this test? Why the first 3 SparkListenerSpeculativeTaskSubmitted events can trigger to allocate more executors, but here we don't?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is because we use the sum of (running + appending) jobs to calculate how many executors are needed (maxNumExecutorsNeeded). so wether a task is pending or running, the executors needed are the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then why speculative task submission adds running/appending jobs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

speculative task is also a task which needs executor to execute it, so, when we calculate how many executors are needed, we need to count the speculative tasks.

sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
assert(numExecutorsTarget(manager) === 5)
assert(addExecutors(manager) === 0)
assert(numExecutorsToAdd(manager) === 1)

// Verify that running a speculative task doesn't affect the target
sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-2", true)))
assert(numExecutorsTarget(manager) === 5)
assert(addExecutors(manager) === 0)
assert(numExecutorsToAdd(manager) === 1)
}

test("cancel pending executors when no longer needed") {
sc = createSparkContext(0, 10, 0)
val manager = sc.executorAllocationManager.get
Expand Down Expand Up @@ -1031,10 +1065,15 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
taskLocalityPreferences = taskLocalityPreferences)
}

private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): TaskInfo = {
new TaskInfo(taskId, taskIndex, 0, 0, executorId, "", TaskLocality.ANY, speculative = false)
private def createTaskInfo(
taskId: Int,
taskIndex: Int,
executorId: String,
speculative: Boolean = false): TaskInfo = {
new TaskInfo(taskId, taskIndex, 0, 0, executorId, "", TaskLocality.ANY, speculative)
}


/* ------------------------------------------------------- *
| Helper methods for accessing private methods and fields |
* ------------------------------------------------------- */
Expand All @@ -1061,6 +1100,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
private val _onExecutorBusy = PrivateMethod[Unit]('onExecutorBusy)
private val _localityAwareTasks = PrivateMethod[Int]('localityAwareTasks)
private val _hostToLocalTaskCount = PrivateMethod[Map[String, Int]]('hostToLocalTaskCount)
private val _onSpeculativeTaskSubmitted = PrivateMethod[Unit]('onSpeculativeTaskSubmitted)

private def numExecutorsToAdd(manager: ExecutorAllocationManager): Int = {
manager invokePrivate _numExecutorsToAdd()
Expand Down Expand Up @@ -1136,6 +1176,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
manager invokePrivate _onExecutorBusy(id)
}

private def onSpeculativeTaskSubmitted(manager: ExecutorAllocationManager, id: String) : Unit = {
manager invokePrivate _onSpeculativeTaskSubmitted(id)
}

private def localityAwareTasks(manager: ExecutorAllocationManager): Int = {
manager invokePrivate _localityAwareTasks()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
exception: Option[Throwable]): Unit = {
taskScheduler.taskSetsFailed += taskSet.id
}

override def speculativeTaskSubmitted(task: Task[_]): Unit = {
taskScheduler.speculativeTasks += task.partitionId
}
}

// Get the rack for a given host
Expand Down Expand Up @@ -92,6 +96,7 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
val endedTasks = new mutable.HashMap[Long, TaskEndReason]
val finishedManagers = new ArrayBuffer[TaskSetManager]
val taskSetsFailed = new ArrayBuffer[String]
val speculativeTasks = new ArrayBuffer[Int]

val executors = new mutable.HashMap[String, String]
for ((execId, host) <- liveExecutors) {
Expand Down Expand Up @@ -139,6 +144,7 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
}
}


override def getRackForHost(value: String): Option[String] = FakeRackUtil.getRackForHost(value)
}

Expand Down Expand Up @@ -929,6 +935,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// > 0ms, so advance the clock by 1ms here.
clock.advance(1)
assert(manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.toSet === Set(3))

// Offer resource to start the speculative attempt for the running task
val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption5.isDefined)
Expand Down Expand Up @@ -1016,6 +1024,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// > 0ms, so advance the clock by 1ms here.
clock.advance(1)
assert(manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.toSet === Set(3, 4))
// Offer resource to start the speculative attempt for the running task
val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption5.isDefined)
Expand Down