Skip to content

[SPARK-8728] Add configuration for limiting the maximum number of active stages in a fair scheduling queue #7119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/Pool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@ private[spark] class Pool(
val poolName: String,
val schedulingMode: SchedulingMode,
initMinShare: Int,
initWeight: Int)
initWeight: Int,
initMaxRunning: Int)
extends Schedulable
with Logging {

val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
var weight = initWeight
var minShare = initMinShare
val maxRunning = initMaxRunning
var runningTasks = 0
var priority = 0

Expand Down Expand Up @@ -97,8 +99,14 @@ private[spark] class Pool(

override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
def targetSchedulables = { sq:ConcurrentLinkedQueue[Schedulable] =>
if (maxRunning != 0)
sq.take(maxRunning).toSeq
else
sq.toSeq
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only used once so it can be a val:

val targetSchedulables =
  if (maxRunning != 0) {
    schedulableQueue.take(maxRunning).toSeq
  } else {
    schedulableQueue.toSeq
  }
val sortedSchedulableQueue = targetSchedulables.sortWith(...)

val sortedSchedulableQueue =
schedulableQueue.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
targetSchedulables(schedulableQueue).sortWith(taskSetSchedulingAlgorithm.comparator)
for (schedulable <- sortedSchedulableQueue) {
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ private[spark] trait Schedulable {
def schedulingMode: SchedulingMode
def weight: Int
def minShare: Int
def maxRunning: Int
def runningTasks: Int
def priority: Int
def stageId: Int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,13 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
val MINIMUM_SHARES_PROPERTY = "minShare"
val SCHEDULING_MODE_PROPERTY = "schedulingMode"
val WEIGHT_PROPERTY = "weight"
val MAXIMUM_RUNNING_PROPERTY = "maxRunning"
val POOL_NAME_PROPERTY = "@name"
val POOLS_PROPERTY = "pool"
val DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO
val DEFAULT_MINIMUM_SHARE = 0
val DEFAULT_WEIGHT = 1
val DEFAULT_MAXIMUM_RUNNING = 0

override def buildPools() {
var is: Option[InputStream] = None
Expand All @@ -89,10 +91,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
private def buildDefaultPool() {
if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT, DEFAULT_MAXIMUM_RUNNING)
rootPool.addSchedulable(pool)
logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d, maxRunning: %d".format(
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT, DEFAULT_MAXIMUM_RUNNING))
}
}

Expand All @@ -104,6 +106,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
var schedulingMode = DEFAULT_SCHEDULING_MODE
var minShare = DEFAULT_MINIMUM_SHARE
var weight = DEFAULT_WEIGHT
var maxRunning = DEFAULT_MAXIMUM_RUNNING

val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
if (xmlSchedulingMode != "") {
Expand All @@ -125,7 +128,12 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
weight = xmlWeight.toInt
}

val pool = new Pool(poolName, schedulingMode, minShare, weight)
val xmlMaxRunning = (poolNode \ MAXIMUM_RUNNING_PROPERTY).text
if (xmlMaxRunning != "") {
maxRunning = xmlMaxRunning.toInt
}

val pool = new Pool(poolName, schedulingMode, minShare, weight, maxRunning)
rootPool.addSchedulable(pool)
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, schedulingMode, minShare, weight))
Expand All @@ -142,7 +150,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
// we will create a new pool that user has configured in app
// instead of being defined in xml file
parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT, DEFAULT_MAXIMUM_RUNNING)
rootPool.addSchedulable(parentPool)
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private[spark] class TaskSchedulerImpl(
def initialize(backend: SchedulerBackend) {
this.backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
rootPool = new Pool("", schedulingMode, 0, 0, 0)
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ private[spark] class TaskSetManager(

var weight = 1
var minShare = 0
val maxRunning = 0
var priority = taskSet.priority
var stageId = taskSet.stageId
var name = "TaskSet_" + taskSet.stageId.toString
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: StagesTab) {
<thead>
<th>Pool Name</th>
<th>Minimum Share</th>
<th>Maximum Running Stages</th>
<th>Pool Weight</th>
<th>Active Stages</th>
<th>Running Tasks</th>
Expand All @@ -65,6 +66,7 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: StagesTab) {
<a href={href}>{p.name}</a>
</td>
<td>{p.minShare}</td>
<td>{p.maxRunning}</td>
<td>{p.weight}</td>
<td>{activeStages}</td>
<td>{p.runningTasks}</td>
Expand Down
51 changes: 42 additions & 9 deletions core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,18 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
assert(nextTaskSetToSchedule.get.stageId === expectedStageId)
}

def verifyNoRemainedTask(rootPool: Pool): Unit = {
val taskSetQueue = rootPool.getSortedTaskSetQueue
val nextTaskSetToSchedule =
taskSetQueue.find(t => (t.runningTasks + t.tasksSuccessful) < t.numTasks)
assert(nextTaskSetToSchedule.isEmpty)
}

test("FIFO Scheduler Test") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)

val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0, 0)
val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
schedulableBuilder.buildPools()

Expand Down Expand Up @@ -78,7 +85,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
val taskScheduler = new TaskSchedulerImpl(sc)

val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
schedulableBuilder.buildPools()

Expand Down Expand Up @@ -137,19 +144,19 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)

val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0, 0)
val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1, 0)
val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1, 0)
rootPool.addSchedulable(pool0)
rootPool.addSchedulable(pool1)

val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2)
val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1)
val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2, 0)
val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1, 0)
pool0.addSchedulable(pool00)
pool0.addSchedulable(pool01)

val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2)
val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2, 0)
val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1, 0)
pool1.addSchedulable(pool10)
pool1.addSchedulable(pool11)

Expand Down Expand Up @@ -178,4 +185,30 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
scheduleTaskAndVerifyId(2, rootPool, 6)
scheduleTaskAndVerifyId(3, rootPool, 2)
}

test("Fair Scheduler maxRunning Test") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)

val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0, 3)

val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler)
val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler)
val taskSetManager2 = createTaskSetManager(2, 2, taskScheduler)
val taskSetManager3 = createTaskSetManager(3, 2, taskScheduler)

rootPool.addSchedulable(taskSetManager0)
rootPool.addSchedulable(taskSetManager1)
rootPool.addSchedulable(taskSetManager2)
rootPool.addSchedulable(taskSetManager3)

scheduleTaskAndVerifyId(0, rootPool, 0)
scheduleTaskAndVerifyId(1, rootPool, 1)
scheduleTaskAndVerifyId(2, rootPool, 2)
scheduleTaskAndVerifyId(3, rootPool, 0)
scheduleTaskAndVerifyId(4, rootPool, 1)
scheduleTaskAndVerifyId(5, rootPool, 2)

verifyNoRemainedTask(rootPool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename this to noTasksRemain, refactor that method slightly to produce the Boolean nextTaskSetToSchedule.isEmpty, then change this line to assert(noTasksRemain).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better yet, make it noTasksRemainIn(pool: Pool).

}
}