Skip to content

[SPARK-17637][Scheduler]Packed scheduling for Spark tasks across executors #15218

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from

Conversation

zhzhan
Copy link
Contributor

@zhzhan zhzhan commented Sep 23, 2016

What changes were proposed in this pull request?

Restructure the code and implement two new task assigner.
PackedAssigner: try to allocate tasks to the executors with least available cores, so that spark can release reserved executors when dynamic allocation is enabled.

BalancedAssigner: try to allocate tasks to the executors with more available cores in order to balance the workload across all executors.

By default, the original round robin assigner is used.

We test a pipeline, and new PackedAssigner save around 45% regarding the reserved cpu and memory with dynamic allocation enabled.

How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
Both unit test in TaskSchedulerImplSuite and manual tests in production pipeline.

@SparkQA
Copy link

SparkQA commented Sep 23, 2016

Test build #65830 has finished for PR 15218 at commit c3ebf9c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhzhan zhzhan changed the title [Spark-17637][Scheduler]Packed scheduling for Spark tasks across executors [SPARK-17637][Scheduler]Packed scheduling for Spark tasks across executors Sep 23, 2016
@SparkQA
Copy link

SparkQA commented Sep 23, 2016

Test build #65832 has finished for PR 15218 at commit d5f76ae.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 23, 2016

Test build #65831 has finished for PR 15218 at commit ffe9800.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhzhan
Copy link
Contributor Author

zhzhan commented Sep 23, 2016

Failed in DirectKafkaStreamSuite. It should has nothing to do with the patch.

@zhzhan
Copy link
Contributor Author

zhzhan commented Sep 23, 2016

retest please

@gatorsmile
Copy link
Member

gatorsmile commented Sep 23, 2016

@zhzhan
Copy link
Contributor Author

zhzhan commented Sep 23, 2016

@gatorsmile Thanks. #65832 is the latest one which does not have the same failure.

@SparkQA
Copy link

SparkQA commented Sep 24, 2016

Test build #65856 has finished for PR 15218 at commit f71f1c0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
}

class BalancedAssigner(conf: SparkConf) extends TaskAssigner(conf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to shuffle workOffset's for this class too.
Practically, this ensures that initial heap will be randomized when cores are the same.

This will also mean that Ordering below will need to handle case of x.cores == y.cores but x != y

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I don't think need to handle the case of x.cores == y.cores, which means they are equal, and depends on the algorithm in priority queue to decide the behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning 0 implies equality - which is not the case here (x != y but x.cores == y.cores).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm Thanks for the comments. But I am lost here. My understanding is Ordering-wise, x is equal to y if x.cores == y.cores. This ordering is used by priority queue to construct the data structure. Following is an example from trait Ordering. PersonA will be equal to PersionB if they are the same age. Do I miss anything?

  • import scala.util.Sorting
    *
    • case class Person(name:String, age:Int)
    • val people = Array(Person("bob", 30), Person("ann", 32), Person("carl", 19))
      *
    • // sort by age
    • object AgeOrdering extends Ordering[Person] {
    • def compare(a:Person, b:Person) = a.age compare b.age
    • }
    • Sorting.quickSort(people)(AgeOrdering)
    • }}}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, my bad. I was thinking of Ordered

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In class PackedAssigner, you add space between functions. Do you want to be consistent with the style?

@SparkQA
Copy link

SparkQA commented Oct 4, 2016

Test build #66328 has finished for PR 15218 at commit c7a0ce2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhzhan
Copy link
Contributor Author

zhzhan commented Oct 4, 2016

@mridulm Thanks for review this. Will wait for a while in case there are more comments before solving it.

@SparkQA
Copy link

SparkQA commented Oct 7, 2016

Test build #66465 has finished for PR 15218 at commit ed8dd69.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mridulm
Copy link
Contributor

mridulm commented Oct 7, 2016

Btw, taking a step back, I am not sure this will work as you expect it to.
Other than a few taskset's - those without locality information - the schedule is going to be highly biased towards the locality information supplied.

This typically will mean PROCESS_LOCAL (almost always) and then NODE_LOCAL - which means, exactly match the executor or host (irrespective of the order we traverse the offers).

The randomization of offers we do is for a specific set of purposes - spread load if no locality information (not very common imo) or spread it across cluster when locality information is of more 'low quality' - like from an InputFormat or for shuffle when we are using heuristics which might not be optimal.

But since I have not looked at this in a while, will CC kay. +CC @kayousterhout pls do take a look in case I am missing something.

@zhzhan
Copy link
Contributor Author

zhzhan commented Oct 7, 2016

@mridulm Thanks for the comments. Your concern regarding the locality is right. The patch does not change this behavior, which takes priority of locality preference. But if multiple executors satisfying the locality restriction, the policy will be applied. In our production pipeline, we do see a big gain with respect to reserved cpu resources when dynamic allocation is enabled.

@kayousterhout Would you like take a look and provide your comments?

@mridulm
Copy link
Contributor

mridulm commented Oct 9, 2016

@zhzhan I am curious why this is the case for the jobs being mentioned.
This pr should have an impact if the locality preference of the taskset being run is fairly suboptimal to begin with, no ?

If the tasks have PROCESS_LOCAL or NODE_LOCAL locality preference - that will take precedence, and attempts to spread the load or reduce spread to nodes as envisioned here will not work.

So the target here seems to be RACK_LOCAL or ANY locality preference - which should be fairly uncommon; unless I am missing something here w.r.t the jobs being run.

EDIT: I can see one case where it will help, which is why you have shuffle tasks being run where number of partitions is large (greater than the hardcoded thresholds in code).
In this case, we end up without locality pref; and if none of the rdd's run after the shuffle rdd in the shuffle task declare locality pref, then you end up with no locality pref.
Is that the case you are observing ? iirc if more than 1k number of map tasks or reduce tasks - then this behavior might be observed.

@zhzhan
Copy link
Contributor Author

zhzhan commented Oct 9, 2016

@mridulm You are right. This patch is mainly for the job that has multiple stages, which is very common in production pipeline. As you mentioned, if there is shuffle involved, getLocationsWithLargestOutputs in MapOutputTracker typically return None for the ShuffledRowRDD and ShuffledRDD because of the threshold REDUCER_PREF_LOCS_FRACTION (20%).

The ShuffledRowRDD/ShuffleRDD can be easily more than 10 partitions (even hundreds) in real production pipeline, thus the patch does help a lot in CPU reservation time.

@mridulm
Copy link
Contributor

mridulm commented Oct 15, 2016

I am assuming @kayousterhout does not have comments on this.
Can you please fix the conflict @zhzhan ? I will merge it in after that to master.

@SparkQA
Copy link

SparkQA commented Oct 15, 2016

Test build #67021 has finished for PR 15218 at commit 98a9747.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in ed14633 Oct 16, 2016
@mridulm
Copy link
Contributor

mridulm commented Oct 16, 2016

Merged to master, thanks @zhzhan !

@zhzhan
Copy link
Contributor Author

zhzhan commented Oct 16, 2016

@mridulm Thanks for reviewing this.

@rxin
Copy link
Contributor

rxin commented Oct 16, 2016

@zhzhan and @mridulm all the classes need to be private[scheduler] shouldn't they?

}
}

class PackedAssigner(conf: SparkConf) extends TaskAssigner(conf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need more documentation here to explain what this class does.


class PackedAssigner(conf: SparkConf) extends TaskAssigner(conf) {

var sorted: Seq[OfferState] = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all these variables should be private


// Release internally maintained resources. Subclass is responsible to
// release its own private resources.
def reset: Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should have parentheses since it has side effect


import org.apache.spark.SparkConf

case class OfferState(workOffer: WorkerOffer, var cores: Int) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need documentation explaining what this class does

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also case classes are supposed to have mostly immutable state -- if you want cores to be mutable, I'd just make this a normal class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read more code. Shouldn't cores be coresRemaining, or coresAvailable?


case class OfferState(workOffer: WorkerOffer, var cores: Int) {
// Build a list of tasks to assign to each worker.
val tasks = new ArrayBuffer[TaskDescription](cores)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again I think you need to document what this actually does. My guess (without having looked at rest of the code) is that the index indicates some worker id, but I'm not sure and I might be wrong. We need to explain it here.

Copy link
Contributor

@rxin rxin Oct 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok - my guess was wrong. It would be great to actually say what this list means, e.g. is this a queue?

@rxin
Copy link
Contributor

rxin commented Oct 16, 2016

@mridulm @zhzhan I liked the idea here, but unfortunately I think it's merged prematurely. There are insufficient documentation and basic styles that don't align with rest of Spark. I'm going to revert this. It would be good to get this in, and I think with very little work we can get it to a shape that look a lot better.

@@ -1334,6 +1334,17 @@ Apart from these, the following properties are also available, and may be useful
Should be greater than or equal to 1. Number of allowed retries = this value - 1.
</td>
</tr>
<tr>
<td><code>spark.task.assigner</code></td>
<td>org.apache.spark.scheduler.RoundRobinAssigner</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than asking for the full class name, I'd just have "roundrobin" and "packed" (case insensitive) as the options and internally maintain the mapping.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Spark SQL side, we did a similar thing for data source. You can check the code in the function lookupDataSource.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea in this case I wouldn't even support external assigners. Just have strings to use the built-in ones.

@@ -109,6 +109,72 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(!failedTaskSet)
}

test("Scheduler balance the assignment to the worker with more free cores") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks a lot for creating the test cases

@@ -1334,6 +1334,17 @@ Apart from these, the following properties are also available, and may be useful
Should be greater than or equal to 1. Number of allowed retries = this value - 1.
</td>
</tr>
<tr>
<td><code>spark.task.assigner</code></td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add "scheduler" to the option, e.g. "spark.scheduler.taskAssigner"

val tasks = new ArrayBuffer[TaskDescription](cores)
}

abstract class TaskAssigner(conf: SparkConf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of taking in a generic SparkConf, I'd just take in the cpu per task for now, until we see a clear need to be more generic. This simplifies the dependencies of the class.

@rxin
Copy link
Contributor

rxin commented Oct 16, 2016

@zhzhan in general it'd be great to have proper documentation on the classes. For example, it is important to document the behavior of the various assigners, and even more importantly, document the contract for TaskAssigner. The control flow is fairly confusing right now -- I'm not very smart and things that are complicated take me a long time to understand, and when I try changing them in the future, there's a very good chance I will make a mistake and mess it up. It would be great if we can simplify the control flow. If we can't, then we should document it more clearly. For example, when init/reset should be called are all part of the contracts, and none of them are really documented.

@zhzhan
Copy link
Contributor Author

zhzhan commented Oct 16, 2016

@rxin Thanks a lot for the detail review. I will update the patch.

@@ -61,6 +59,21 @@ private[spark] class TaskSchedulerImpl(

val conf = sc.conf

val DEFAULT_TASK_ASSIGNER = classOf[RoundRobinAssigner].getName
lazy val taskAssigner: TaskAssigner = {
val className = conf.get("spark.task.assigner", DEFAULT_TASK_ASSIGNER)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like the above MAX_TASK_FAILURES, we can also add spark.task.assigner into object config.

By default, round robin with randomness is used.
org.apache.spark.scheduler.BalancedAssigner tries to balance the task across all workers (allocating tasks to
workers with most free cores). org.apache.spark.scheduler.PackedAssigner tries to allocate tasks to workers
with the least free cores, which may help releasing the resources when dynamic allocation is enabled.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when dynamic allocation is enabled. ->
when dynamic allocation (spark.dynamicAllocation.enabled) is enabled.

assert(4 === taskDescriptions.length)
taskDescriptions.map(_.executorId)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove this empty line.

assert(!failedTaskSet)
}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove this empty line.

@@ -408,4 +474,5 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(thirdTaskDescs.size === 0)
assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1", "executor3")))
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove this empty line.

@gatorsmile
Copy link
Member

The test case design is pretty good. It covers all the scenarios.

  • Could you add a check for the negative case? That means, when users do not provide the right TaskAssigner name, we fall back to the default round robin one
  • For the existing unchanged test cases in TaskSchedulerImplSuite.scala, please add a check to verify whether it picks the default one.
  • If possible, please change one of the existing test case in TaskSchedulerImplSuite.scala, ensure that users are allowed to input the round robin as the task assigner.

}

class RoundRobinAssigner(conf: SparkConf) extends TaskAssigner(conf) {
var i = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any better variable name?

return Ordering[Int].compare(x.cores, y.cores)
}
}
def init(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

override

def getNext(): OfferState

// Called by the TaskScheduler to indicate whether the current offer is accepted
// In order to decide whether the current is valid for the next offering.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"In" should be "in"

val tid = task.taskId
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
executorIdToTaskCount(execId) += 1
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
current.cores = current.cores - CPUS_PER_TASK
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to follow the previous style current.cores -= CPUS_PER_TASK

@zhzhan
Copy link
Contributor Author

zhzhan commented Oct 19, 2016

@wangmiao1981 Thanks for reviewing this. I will open another PR solving these comments soon.

@zhzhan zhzhan deleted the packed-scheduler branch October 19, 2016 02:44
robert3005 pushed a commit to palantir/spark that referenced this pull request Nov 1, 2016
…cutors

## What changes were proposed in this pull request?

Restructure the code and implement two new task assigner.
PackedAssigner: try to allocate tasks to the executors with least available cores, so that spark can release reserved executors when dynamic allocation is enabled.

BalancedAssigner: try to allocate tasks to the executors with more available cores in order to balance the workload across all executors.

By default, the original round robin assigner is used.

We test a pipeline, and new PackedAssigner  save around 45% regarding the reserved cpu and memory with dynamic allocation enabled.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
Both unit test in TaskSchedulerImplSuite and manual tests in production pipeline.

Author: Zhan Zhang <zhanzhang@fb.com>

Closes apache#15218 from zhzhan/packed-scheduler.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…cutors

## What changes were proposed in this pull request?

Restructure the code and implement two new task assigner.
PackedAssigner: try to allocate tasks to the executors with least available cores, so that spark can release reserved executors when dynamic allocation is enabled.

BalancedAssigner: try to allocate tasks to the executors with more available cores in order to balance the workload across all executors.

By default, the original round robin assigner is used.

We test a pipeline, and new PackedAssigner  save around 45% regarding the reserved cpu and memory with dynamic allocation enabled.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
Both unit test in TaskSchedulerImplSuite and manual tests in production pipeline.

Author: Zhan Zhang <zhanzhang@fb.com>

Closes apache#15218 from zhzhan/packed-scheduler.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants