Skip to content

Commit d5a8dbf

Browse files
committed
Merge pull request apache#928 from jerryshao/fairscheduler-refactor
Refactor FairSchedulableBuilder (cherry picked from commit 834686b) Signed-off-by: Reynold Xin <rxin@apache.org>
1 parent df0a40c commit d5a8dbf

File tree

1 file changed

+56
-43
lines changed

1 file changed

+56
-43
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala

Lines changed: 56 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,12 @@
1717

1818
package org.apache.spark.scheduler.cluster
1919

20-
import java.io.{File, FileInputStream, FileOutputStream, FileNotFoundException}
21-
import java.util.Properties
22-
23-
import scala.xml.XML
20+
import java.io.{FileInputStream, InputStream}
21+
import java.util.{NoSuchElementException, Properties}
2422

2523
import org.apache.spark.Logging
26-
import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
2724

25+
import scala.xml.XML
2826

2927
/**
3028
* An interface to build Schedulable tree
@@ -51,7 +49,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
5149
private[spark] class FairSchedulableBuilder(val rootPool: Pool)
5250
extends SchedulableBuilder with Logging {
5351

54-
val schedulerAllocFile = System.getProperty("spark.scheduler.allocation.file")
52+
val schedulerAllocFile = Option(System.getProperty("spark.scheduler.allocation.file"))
53+
val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
5554
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
5655
val DEFAULT_POOL_NAME = "default"
5756
val MINIMUM_SHARES_PROPERTY = "minShare"
@@ -64,48 +63,26 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
6463
val DEFAULT_WEIGHT = 1
6564

6665
override def buildPools() {
67-
if (schedulerAllocFile != null) {
68-
val file = new File(schedulerAllocFile)
69-
if (file.exists()) {
70-
val xml = XML.loadFile(file)
71-
for (poolNode <- (xml \\ POOLS_PROPERTY)) {
72-
73-
val poolName = (poolNode \ POOL_NAME_PROPERTY).text
74-
var schedulingMode = DEFAULT_SCHEDULING_MODE
75-
var minShare = DEFAULT_MINIMUM_SHARE
76-
var weight = DEFAULT_WEIGHT
77-
78-
val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
79-
if (xmlSchedulingMode != "") {
80-
try {
81-
schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
82-
} catch {
83-
case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
84-
}
85-
}
86-
87-
val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
88-
if (xmlMinShare != "") {
89-
minShare = xmlMinShare.toInt
90-
}
91-
92-
val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
93-
if (xmlWeight != "") {
94-
weight = xmlWeight.toInt
95-
}
96-
97-
val pool = new Pool(poolName, schedulingMode, minShare, weight)
98-
rootPool.addSchedulable(pool)
99-
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
100-
poolName, schedulingMode, minShare, weight))
66+
var is: Option[InputStream] = None
67+
try {
68+
is = Option {
69+
schedulerAllocFile.map { f =>
70+
new FileInputStream(f)
71+
}.getOrElse {
72+
getClass.getClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
10173
}
102-
} else {
103-
throw new java.io.FileNotFoundException(
104-
"Fair scheduler allocation file not found: " + schedulerAllocFile)
10574
}
75+
76+
is.foreach { i => buildFairSchedulerPool(i) }
77+
} finally {
78+
is.foreach(_.close())
10679
}
10780

10881
// finally create "default" pool
82+
buildDefaultPool()
83+
}
84+
85+
private def buildDefaultPool() {
10986
if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
11087
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
11188
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
@@ -115,6 +92,42 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
11592
}
11693
}
11794

95+
private def buildFairSchedulerPool(is: InputStream) {
96+
val xml = XML.load(is)
97+
for (poolNode <- (xml \\ POOLS_PROPERTY)) {
98+
99+
val poolName = (poolNode \ POOL_NAME_PROPERTY).text
100+
var schedulingMode = DEFAULT_SCHEDULING_MODE
101+
var minShare = DEFAULT_MINIMUM_SHARE
102+
var weight = DEFAULT_WEIGHT
103+
104+
val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
105+
if (xmlSchedulingMode != "") {
106+
try {
107+
schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
108+
} catch {
109+
case e: NoSuchElementException =>
110+
logWarning("Error xml schedulingMode, using default schedulingMode")
111+
}
112+
}
113+
114+
val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
115+
if (xmlMinShare != "") {
116+
minShare = xmlMinShare.toInt
117+
}
118+
119+
val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
120+
if (xmlWeight != "") {
121+
weight = xmlWeight.toInt
122+
}
123+
124+
val pool = new Pool(poolName, schedulingMode, minShare, weight)
125+
rootPool.addSchedulable(pool)
126+
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
127+
poolName, schedulingMode, minShare, weight))
128+
}
129+
}
130+
118131
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
119132
var poolName = DEFAULT_POOL_NAME
120133
var parentPool = rootPool.getSchedulableByName(poolName)

0 commit comments

Comments
 (0)