-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-19326] Speculated task attempts do not get launched in few scenarios #18492
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9d4886e
d4dd56c
210ba70
cb28404
41170df
9784b4e
5331ab9
32d0c23
92c4d09
33f5757
a1fb8d3
0fa5c01
94c4db6
37cf760
77b4729
c7c16a6
fa31cd8
f7cdad9
8b8b128
35cf6a5
7a8ca2a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,6 +52,9 @@ case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: T | |
@DeveloperApi | ||
case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent | ||
|
||
@DeveloperApi | ||
case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends SparkListenerEvent | ||
|
||
@DeveloperApi | ||
case class SparkListenerTaskEnd( | ||
stageId: Int, | ||
|
@@ -290,6 +293,11 @@ private[spark] trait SparkListenerInterface { | |
*/ | ||
def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit | ||
|
||
/** | ||
* Called when a speculative task is submitted | ||
*/ | ||
def onSpeculativeTaskSubmitted(speculativeTask: SparkListenerSpeculativeTaskSubmitted): Unit | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For normal task, we have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would keep the name onSpeculativeTaskSubmitted. Because when the event happens, it only submit a speculative task to be launched in the future, the task has not started yet. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh i see. So we don't track the submission of normal tasks? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I grepped, and dont think we have events related to normal task submitted. |
||
|
||
/** | ||
* Called when other events like SQL-specific events are posted. | ||
*/ | ||
|
@@ -354,5 +362,8 @@ abstract class SparkListener extends SparkListenerInterface { | |
|
||
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { } | ||
|
||
override def onSpeculativeTaskSubmitted( | ||
speculativeTask: SparkListenerSpeculativeTaskSubmitted): Unit = { } | ||
|
||
override def onOtherEvent(event: SparkListenerEvent): Unit = { } | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -188,6 +188,40 @@ class ExecutorAllocationManagerSuite | |
assert(numExecutorsTarget(manager) === 10) | ||
} | ||
|
||
test("add executors when speculative tasks added") { | ||
sc = createSparkContext(0, 10, 0) | ||
val manager = sc.executorAllocationManager.get | ||
|
||
// Verify that we're capped at number of tasks including the speculative ones in the stage | ||
sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it a possible case? the first event is speculative task submitted? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is not likely. Speculative job is only submitted when certain percentage of jobs have finished successfully. |
||
assert(numExecutorsTarget(manager) === 0) | ||
assert(numExecutorsToAdd(manager) === 1) | ||
assert(addExecutors(manager) === 1) | ||
sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1)) | ||
sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1)) | ||
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why the stage submitted event is posted after speculative task submitted event? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In real life, it is possible that a job has multiple stages, one stage is still running some tasks but the next stage starts already. This test tries to micmic. |
||
assert(numExecutorsTarget(manager) === 1) | ||
assert(numExecutorsToAdd(manager) === 2) | ||
assert(addExecutors(manager) === 2) | ||
assert(numExecutorsTarget(manager) === 3) | ||
assert(numExecutorsToAdd(manager) === 4) | ||
assert(addExecutors(manager) === 2) | ||
assert(numExecutorsTarget(manager) === 5) | ||
assert(numExecutorsToAdd(manager) === 1) | ||
|
||
// Verify that running a task doesn't affect the target | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you explain more about this test? Why the first 3 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is because we use the sum of (running + appending) jobs to calculate how many executors are needed (maxNumExecutorsNeeded). so wether a task is pending or running, the executors needed are the same. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. then why speculative task submission adds running/appending jobs? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. speculative task is also a task which needs executor to execute it, so, when we calculate how many executors are needed, we need to count the speculative tasks. |
||
sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) | ||
assert(numExecutorsTarget(manager) === 5) | ||
assert(addExecutors(manager) === 0) | ||
assert(numExecutorsToAdd(manager) === 1) | ||
|
||
// Verify that running a speculative task doesn't affect the target | ||
sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-2", true))) | ||
assert(numExecutorsTarget(manager) === 5) | ||
assert(addExecutors(manager) === 0) | ||
assert(numExecutorsToAdd(manager) === 1) | ||
} | ||
|
||
test("cancel pending executors when no longer needed") { | ||
sc = createSparkContext(0, 10, 0) | ||
val manager = sc.executorAllocationManager.get | ||
|
@@ -1031,10 +1065,15 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { | |
taskLocalityPreferences = taskLocalityPreferences) | ||
} | ||
|
||
private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): TaskInfo = { | ||
new TaskInfo(taskId, taskIndex, 0, 0, executorId, "", TaskLocality.ANY, speculative = false) | ||
private def createTaskInfo( | ||
taskId: Int, | ||
taskIndex: Int, | ||
executorId: String, | ||
speculative: Boolean = false): TaskInfo = { | ||
new TaskInfo(taskId, taskIndex, 0, 0, executorId, "", TaskLocality.ANY, speculative) | ||
} | ||
|
||
|
||
/* ------------------------------------------------------- * | ||
| Helper methods for accessing private methods and fields | | ||
* ------------------------------------------------------- */ | ||
|
@@ -1061,6 +1100,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { | |
private val _onExecutorBusy = PrivateMethod[Unit]('onExecutorBusy) | ||
private val _localityAwareTasks = PrivateMethod[Int]('localityAwareTasks) | ||
private val _hostToLocalTaskCount = PrivateMethod[Map[String, Int]]('hostToLocalTaskCount) | ||
private val _onSpeculativeTaskSubmitted = PrivateMethod[Unit]('onSpeculativeTaskSubmitted) | ||
|
||
private def numExecutorsToAdd(manager: ExecutorAllocationManager): Int = { | ||
manager invokePrivate _numExecutorsToAdd() | ||
|
@@ -1136,6 +1176,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { | |
manager invokePrivate _onExecutorBusy(id) | ||
} | ||
|
||
private def onSpeculativeTaskSubmitted(manager: ExecutorAllocationManager, id: String) : Unit = { | ||
manager invokePrivate _onSpeculativeTaskSubmitted(id) | ||
} | ||
|
||
private def localityAwareTasks(manager: ExecutorAllocationManager): Int = { | ||
manager invokePrivate _localityAwareTasks() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@janewangfb Would you please explain why here
+ 1
if there's pending speculativeTasks, should the number of executors be calculated based on the number of pending tasks? Thanks!There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
maxNumExecutorsNeeded + 1
doesn't quite make sense.@janewangfb could you please post/update some comments here? And I wonder why we didn't take
pendingSpeculativeTasks
into account when calculatingmaxNumExecutorsNeeded()
Or @jerryshao, do you know the rationale?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also confused by
+1
here. And I think we have already takependingSpeculativeTasks
into account @advancedxy :Seems this check is redundant.
And it doesn't sync to CM if
numExecutorsTarget
change(after+1
).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@janewangfb @cloud-fan Sorry, I realize this is very old PR but found it because I was confused by this logic as well, is there a reason we are adding 1 here with speculative?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't remember the details as it's too old. But when I look at it again now, this looks a mistake to me: the
+1
seems to try to match thenumExecutorsToAdd = 1
in the previous code. However,numExecutorsToAdd = 1
doesn't mean we want to allocate one more executor right now.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, filed https://issues.apache.org/jira/browse/SPARK-28403