Skip to content

Commit

Permalink
Refactor FairSchedulableBuilder:
Browse files Browse the repository at this point in the history
1. Configuration can be read from classpath if not set explicitly.
2. Add missing close handler.
  • Loading branch information
jerryshao committed Sep 22, 2013
1 parent a2ea069 commit 5850f59
Showing 1 changed file with 53 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler.cluster

import java.io.{File, FileInputStream, FileOutputStream, FileNotFoundException}
import java.io.{File, FileInputStream, FileOutputStream, FileNotFoundException, InputStream}
import java.util.Properties

import scala.xml.XML
Expand Down Expand Up @@ -51,7 +51,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
private[spark] class FairSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {

val schedulerAllocFile = System.getProperty("spark.scheduler.allocation.file")
val schedulerAllocFile = Option(System.getProperty("spark.scheduler.allocation.file"))
val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
val DEFAULT_POOL_NAME = "default"
val MINIMUM_SHARES_PROPERTY = "minShare"
Expand All @@ -64,48 +65,26 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
val DEFAULT_WEIGHT = 1

override def buildPools() {
if (schedulerAllocFile != null) {
val file = new File(schedulerAllocFile)
if (file.exists()) {
val xml = XML.loadFile(file)
for (poolNode <- (xml \\ POOLS_PROPERTY)) {

val poolName = (poolNode \ POOL_NAME_PROPERTY).text
var schedulingMode = DEFAULT_SCHEDULING_MODE
var minShare = DEFAULT_MINIMUM_SHARE
var weight = DEFAULT_WEIGHT

val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
if (xmlSchedulingMode != "") {
try {
schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
} catch {
case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
}
}

val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
if (xmlMinShare != "") {
minShare = xmlMinShare.toInt
}

val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
if (xmlWeight != "") {
weight = xmlWeight.toInt
}

val pool = new Pool(poolName, schedulingMode, minShare, weight)
rootPool.addSchedulable(pool)
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, schedulingMode, minShare, weight))
var is: Option[InputStream] = None
try {
is = Option {
schedulerAllocFile map { f =>
new FileInputStream(f)
} getOrElse {
getClass.getClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
}
} else {
throw new java.io.FileNotFoundException(
"Fair scheduler allocation file not found: " + schedulerAllocFile)
}

is foreach { i => buildFairSchedulerPool(i) }
} finally {
is.foreach(_.close)
}

// finally create "default" pool
buildDefaultPool()
}

private def buildDefaultPool() {
if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
Expand All @@ -115,6 +94,41 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
}
}

private def buildFairSchedulerPool(is: InputStream) {
val xml = XML.load(is)
for (poolNode <- (xml \\ POOLS_PROPERTY)) {

val poolName = (poolNode \ POOL_NAME_PROPERTY).text
var schedulingMode = DEFAULT_SCHEDULING_MODE
var minShare = DEFAULT_MINIMUM_SHARE
var weight = DEFAULT_WEIGHT

val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
if (xmlSchedulingMode != "") {
try {
schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
} catch {
case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
}
}

val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
if (xmlMinShare != "") {
minShare = xmlMinShare.toInt
}

val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
if (xmlWeight != "") {
weight = xmlWeight.toInt
}

val pool = new Pool(poolName, schedulingMode, minShare, weight)
rootPool.addSchedulable(pool)
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, schedulingMode, minShare, weight))
}
}

override def addTaskSetManager(manager: Schedulable, properties: Properties) {
var poolName = DEFAULT_POOL_NAME
var parentPool = rootPool.getSchedulableByName(poolName)
Expand Down

0 comments on commit 5850f59

Please sign in to comment.