-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-17637][Scheduler]Packed scheduling for Spark tasks across executors #15218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #65830 has finished for PR 15218 at commit
|
Test build #65832 has finished for PR 15218 at commit
|
Test build #65831 has finished for PR 15218 at commit
|
Failed in DirectKafkaStreamSuite. It should has nothing to do with the patch. |
retest please |
@gatorsmile Thanks. #65832 is the latest one which does not have the same failure. |
Test build #65856 has finished for PR 15218 at commit
|
f71f1c0
to
c7a0ce2
Compare
} | ||
} | ||
|
||
class BalancedAssigner(conf: SparkConf) extends TaskAssigner(conf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to shuffle workOffset's for this class too.
Practically, this ensures that initial heap will be randomized when cores are the same.
This will also mean that Ordering below will need to handle case of x.cores == y.cores but x != y
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I don't think need to handle the case of x.cores == y.cores, which means they are equal, and depends on the algorithm in priority queue to decide the behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returning 0 implies equality - which is not the case here (x != y but x.cores == y.cores).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mridulm Thanks for the comments. But I am lost here. My understanding is Ordering-wise, x is equal to y if x.cores == y.cores. This ordering is used by priority queue to construct the data structure. Following is an example from trait Ordering. PersonA will be equal to PersionB if they are the same age. Do I miss anything?
- import scala.util.Sorting
*- case class Person(name:String, age:Int)
- val people = Array(Person("bob", 30), Person("ann", 32), Person("carl", 19))
* - // sort by age
- object AgeOrdering extends Ordering[Person] {
- def compare(a:Person, b:Person) = a.age compare b.age
- }
- Sorting.quickSort(people)(AgeOrdering)
- }}}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, my bad. I was thinking of Ordered
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In class PackedAssigner
, you add space between functions. Do you want to be consistent with the style?
Test build #66328 has finished for PR 15218 at commit
|
@mridulm Thanks for review this. Will wait for a while in case there are more comments before solving it. |
Test build #66465 has finished for PR 15218 at commit
|
Btw, taking a step back, I am not sure this will work as you expect it to. This typically will mean PROCESS_LOCAL (almost always) and then NODE_LOCAL - which means, exactly match the executor or host (irrespective of the order we traverse the offers). The randomization of offers we do is for a specific set of purposes - spread load if no locality information (not very common imo) or spread it across cluster when locality information is of more 'low quality' - like from an InputFormat or for shuffle when we are using heuristics which might not be optimal. But since I have not looked at this in a while, will CC kay. +CC @kayousterhout pls do take a look in case I am missing something. |
@mridulm Thanks for the comments. Your concern regarding the locality is right. The patch does not change this behavior, which takes priority of locality preference. But if multiple executors satisfying the locality restriction, the policy will be applied. In our production pipeline, we do see a big gain with respect to reserved cpu resources when dynamic allocation is enabled. @kayousterhout Would you like take a look and provide your comments? |
@zhzhan I am curious why this is the case for the jobs being mentioned. If the tasks have PROCESS_LOCAL or NODE_LOCAL locality preference - that will take precedence, and attempts to spread the load or reduce spread to nodes as envisioned here will not work. So the target here seems to be RACK_LOCAL or ANY locality preference - which should be fairly uncommon; unless I am missing something here w.r.t the jobs being run. EDIT: I can see one case where it will help, which is why you have shuffle tasks being run where number of partitions is large (greater than the hardcoded thresholds in code). |
@mridulm You are right. This patch is mainly for the job that has multiple stages, which is very common in production pipeline. As you mentioned, if there is shuffle involved, getLocationsWithLargestOutputs in MapOutputTracker typically return None for the ShuffledRowRDD and ShuffledRDD because of the threshold REDUCER_PREF_LOCS_FRACTION (20%). The ShuffledRowRDD/ShuffleRDD can be easily more than 10 partitions (even hundreds) in real production pipeline, thus the patch does help a lot in CPU reservation time. |
I am assuming @kayousterhout does not have comments on this. |
ed8dd69
to
98a9747
Compare
Test build #67021 has finished for PR 15218 at commit
|
Merged to master, thanks @zhzhan ! |
@mridulm Thanks for reviewing this. |
} | ||
} | ||
|
||
class PackedAssigner(conf: SparkConf) extends TaskAssigner(conf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need more documentation here to explain what this class does.
|
||
class PackedAssigner(conf: SparkConf) extends TaskAssigner(conf) { | ||
|
||
var sorted: Seq[OfferState] = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all these variables should be private
|
||
// Release internally maintained resources. Subclass is responsible to | ||
// release its own private resources. | ||
def reset: Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should have parentheses since it has side effect
|
||
import org.apache.spark.SparkConf | ||
|
||
case class OfferState(workOffer: WorkerOffer, var cores: Int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need documentation explaining what this class does
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also case classes are supposed to have mostly immutable state -- if you want cores to be mutable, I'd just make this a normal class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I read more code. Shouldn't cores be coresRemaining, or coresAvailable?
|
||
case class OfferState(workOffer: WorkerOffer, var cores: Int) { | ||
// Build a list of tasks to assign to each worker. | ||
val tasks = new ArrayBuffer[TaskDescription](cores) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again I think you need to document what this actually does. My guess (without having looked at rest of the code) is that the index indicates some worker id, but I'm not sure and I might be wrong. We need to explain it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok - my guess was wrong. It would be great to actually say what this list means, e.g. is this a queue?
@mridulm @zhzhan I liked the idea here, but unfortunately I think it's merged prematurely. There are insufficient documentation and basic styles that don't align with rest of Spark. I'm going to revert this. It would be good to get this in, and I think with very little work we can get it to a shape that look a lot better. |
@@ -1334,6 +1334,17 @@ Apart from these, the following properties are also available, and may be useful | |||
Should be greater than or equal to 1. Number of allowed retries = this value - 1. | |||
</td> | |||
</tr> | |||
<tr> | |||
<td><code>spark.task.assigner</code></td> | |||
<td>org.apache.spark.scheduler.RoundRobinAssigner</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rather than asking for the full class name, I'd just have "roundrobin" and "packed" (case insensitive) as the options and internally maintain the mapping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Spark SQL side, we did a similar thing for data source. You can check the code in the function lookupDataSource.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea in this case I wouldn't even support external assigners. Just have strings to use the built-in ones.
@@ -109,6 +109,72 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B | |||
assert(!failedTaskSet) | |||
} | |||
|
|||
test("Scheduler balance the assignment to the worker with more free cores") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks a lot for creating the test cases
@@ -1334,6 +1334,17 @@ Apart from these, the following properties are also available, and may be useful | |||
Should be greater than or equal to 1. Number of allowed retries = this value - 1. | |||
</td> | |||
</tr> | |||
<tr> | |||
<td><code>spark.task.assigner</code></td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd add "scheduler" to the option, e.g. "spark.scheduler.taskAssigner"
val tasks = new ArrayBuffer[TaskDescription](cores) | ||
} | ||
|
||
abstract class TaskAssigner(conf: SparkConf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of taking in a generic SparkConf, I'd just take in the cpu per task for now, until we see a clear need to be more generic. This simplifies the dependencies of the class.
@zhzhan in general it'd be great to have proper documentation on the classes. For example, it is important to document the behavior of the various assigners, and even more importantly, document the contract for TaskAssigner. The control flow is fairly confusing right now -- I'm not very smart and things that are complicated take me a long time to understand, and when I try changing them in the future, there's a very good chance I will make a mistake and mess it up. It would be great if we can simplify the control flow. If we can't, then we should document it more clearly. For example, when init/reset should be called are all part of the contracts, and none of them are really documented. |
@rxin Thanks a lot for the detail review. I will update the patch. |
@@ -61,6 +59,21 @@ private[spark] class TaskSchedulerImpl( | |||
|
|||
val conf = sc.conf | |||
|
|||
val DEFAULT_TASK_ASSIGNER = classOf[RoundRobinAssigner].getName | |||
lazy val taskAssigner: TaskAssigner = { | |||
val className = conf.get("spark.task.assigner", DEFAULT_TASK_ASSIGNER) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like the above MAX_TASK_FAILURES, we can also add spark.task.assigner
into object config
.
By default, round robin with randomness is used. | ||
org.apache.spark.scheduler.BalancedAssigner tries to balance the task across all workers (allocating tasks to | ||
workers with most free cores). org.apache.spark.scheduler.PackedAssigner tries to allocate tasks to workers | ||
with the least free cores, which may help releasing the resources when dynamic allocation is enabled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when dynamic allocation is enabled. ->
when dynamic allocation (spark.dynamicAllocation.enabled
) is enabled.
assert(4 === taskDescriptions.length) | ||
taskDescriptions.map(_.executorId) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: remove this empty line.
assert(!failedTaskSet) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: remove this empty line.
@@ -408,4 +474,5 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B | |||
assert(thirdTaskDescs.size === 0) | |||
assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1", "executor3"))) | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: remove this empty line.
The test case design is pretty good. It covers all the scenarios.
|
} | ||
|
||
class RoundRobinAssigner(conf: SparkConf) extends TaskAssigner(conf) { | ||
var i = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any better variable name?
return Ordering[Int].compare(x.cores, y.cores) | ||
} | ||
} | ||
def init(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
override
def getNext(): OfferState | ||
|
||
// Called by the TaskScheduler to indicate whether the current offer is accepted | ||
// In order to decide whether the current is valid for the next offering. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"In" should be "in"
val tid = task.taskId | ||
taskIdToTaskSetManager(tid) = taskSet | ||
taskIdToExecutorId(tid) = execId | ||
executorIdToTaskCount(execId) += 1 | ||
availableCpus(i) -= CPUS_PER_TASK | ||
assert(availableCpus(i) >= 0) | ||
current.cores = current.cores - CPUS_PER_TASK |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to follow the previous style current.cores -= CPUS_PER_TASK
@wangmiao1981 Thanks for reviewing this. I will open another PR solving these comments soon. |
…cutors ## What changes were proposed in this pull request? Restructure the code and implement two new task assigner. PackedAssigner: try to allocate tasks to the executors with least available cores, so that spark can release reserved executors when dynamic allocation is enabled. BalancedAssigner: try to allocate tasks to the executors with more available cores in order to balance the workload across all executors. By default, the original round robin assigner is used. We test a pipeline, and new PackedAssigner save around 45% regarding the reserved cpu and memory with dynamic allocation enabled. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) Both unit test in TaskSchedulerImplSuite and manual tests in production pipeline. Author: Zhan Zhang <zhanzhang@fb.com> Closes apache#15218 from zhzhan/packed-scheduler.
…cutors ## What changes were proposed in this pull request? Restructure the code and implement two new task assigner. PackedAssigner: try to allocate tasks to the executors with least available cores, so that spark can release reserved executors when dynamic allocation is enabled. BalancedAssigner: try to allocate tasks to the executors with more available cores in order to balance the workload across all executors. By default, the original round robin assigner is used. We test a pipeline, and new PackedAssigner save around 45% regarding the reserved cpu and memory with dynamic allocation enabled. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) Both unit test in TaskSchedulerImplSuite and manual tests in production pipeline. Author: Zhan Zhang <zhanzhang@fb.com> Closes apache#15218 from zhzhan/packed-scheduler.
What changes were proposed in this pull request?
Restructure the code and implement two new task assigner.
PackedAssigner: try to allocate tasks to the executors with least available cores, so that spark can release reserved executors when dynamic allocation is enabled.
BalancedAssigner: try to allocate tasks to the executors with more available cores in order to balance the workload across all executors.
By default, the original round robin assigner is used.
We test a pipeline, and new PackedAssigner save around 45% regarding the reserved cpu and memory with dynamic allocation enabled.
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
Both unit test in TaskSchedulerImplSuite and manual tests in production pipeline.