Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Refactoring and test improvements, Mesos coarse- & fine-grained scheduler #2

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark

/**
* A client that communicates with the cluster manager to request or kill executors.
* This is currently supported only in YARN mode.
* This is currently supported only in YARN and Mesos coarse-grained mode.
*/
private[spark] trait ExecutorAllocationClient {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ private[spark] class ExecutorAllocationManager(

// TODO: The default value of 1 for spark.executor.cores works right now because dynamic
// allocation is only supported for YARN and the default number of cores per executor in YARN is
// 1, but it might need to be attained differently for different cluster managers
// 1, but it might need to be attained differently for different cluster managers.
// For Mesos, see SPARK-6350, which adds a new parameter for setting this value.
private val tasksPerExecutor =
conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)

Expand Down
40 changes: 23 additions & 17 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import org.apache.spark.rpc.RpcAddress
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
SparkDeploySchedulerBackend, SimrSchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{CoarseGrainedMesosSchedulerBackend, FineGrainedMesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage._
import org.apache.spark.ui.{SparkUI, ConsoleProgressBar}
Expand Down Expand Up @@ -415,7 +415,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =
if (dynamicAllocationEnabled) {
assert(supportDynamicAllocation,
"Dynamic allocation of executors is currently only supported in YARN mode")
"Dynamic allocation of executors is currently only supported in YARN " +
"and Mesos coarse-grained modes")
Some(new ExecutorAllocationManager(this, listenerBus, conf))
} else {
None
Expand Down Expand Up @@ -1143,10 +1144,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

/**
* Return whether dynamically adjusting the amount of resources allocated to
* this application is supported. This is currently only available for YARN.
* this application is supported. This is currently only available for YARN
* and Mesos coarse-grained modes.
*/
private[spark] def supportDynamicAllocation =
master.contains("yarn") || dynamicAllocationTesting
master.contains("yarn") || master.contains("mesos") || dynamicAllocationTesting

/**
* :: DeveloperApi ::
Expand All @@ -1160,11 +1162,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/**
* Express a preference to the cluster manager for a given total number of executors.
* This can result in canceling pending requests or filing additional requests.
* This is currently only supported in YARN mode. Return whether the request is received.
* This is currently only supported in YARN and Mesos modes.
* Return whether the request is received.
*/
private[spark] override def requestTotalExecutors(numExecutors: Int): Boolean = {
assert(master.contains("yarn") || dynamicAllocationTesting,
"Requesting executors is currently only supported in YARN mode")
assert(supportDynamicAllocation,
"Requesting executors is currently only supported in YARN and Mesos modes")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestTotalExecutors(numExecutors)
Expand All @@ -1177,12 +1180,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/**
* :: DeveloperApi ::
* Request an additional number of executors from the cluster manager.
* This is currently only supported in YARN mode. Return whether the request is received.
* This is currently only supported in YARN and Mesos modes.
* Return whether the request is received.
*/
@DeveloperApi
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
assert(supportDynamicAllocation,
"Requesting executors is currently only supported in YARN mode")
"Requesting executors is currently only supported in YARN and Mesos modes")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestExecutors(numAdditionalExecutors)
Expand All @@ -1195,12 +1199,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/**
* :: DeveloperApi ::
* Request that the cluster manager kill the specified executors.
* This is currently only supported in YARN mode. Return whether the request is received.
* This is currently only supported in YARN and Mesos modes.
* Return whether the request is received.
*/
@DeveloperApi
override def killExecutors(executorIds: Seq[String]): Boolean = {
assert(supportDynamicAllocation,
"Killing executors is currently only supported in YARN mode")
"Killing executors is currently only supported in YARN and Mesos modes")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(executorIds)
Expand All @@ -1213,7 +1218,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/**
* :: DeveloperApi ::
* Request that cluster manager the kill the specified executor.
* This is currently only supported in Yarn mode. Return whether the request is received.
* This is currently only supported in Yarn and Mesos modes.
* Return whether the request is received.
*/
@DeveloperApi
override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId)
Expand Down Expand Up @@ -1403,17 +1409,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def stop() {
// Use the stopping variable to ensure no contention for the stop scenario.
// Still track the stopped variable for use elsewhere in the code.

if (!stopped.compareAndSet(false, true)) {
logInfo("SparkContext already stopped.")
return
}

postApplicationEnd()
ui.foreach(_.stop())
env.metricsSystem.report()
metadataCleaner.cancel()
cleaner.foreach(_.stop())
cleaner.foreach(_.stop())
executorAllocationManager.foreach(_.stop())
dagScheduler.stop()
dagScheduler = null
Expand Down Expand Up @@ -2267,9 +2273,9 @@ object SparkContext extends Logging {
val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
val backend = if (coarseGrained) {
new CoarseMesosSchedulerBackend(scheduler, sc, url)
new CoarseGrainedMesosSchedulerBackend(scheduler, sc, url)
} else {
new MesosSchedulerBackend(scheduler, sc, url)
new FineGrainedMesosSchedulerBackend(scheduler, sc, url)
}
scheduler.initialize(backend)
(backend, scheduler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private[r] class RBackendHandler(server: RBackend)
val reply = bos.toByteArray
ctx.write(reply)
}

override def channelReadComplete(ctx: ChannelHandlerContext): Unit = {
ctx.flush()
}
Expand All @@ -97,6 +97,7 @@ private[r] class RBackendHandler(server: RBackend)
dos: DataOutputStream): Unit = {
var obj: Object = null
try {
import scala.language.existentials
val cls = if (isStatic) {
Class.forName(objId)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}

// Called by subclasses when notified of a lost worker
def removeExecutor(executorId: String, reason: String) {
def removeExecutor(executorId: String, reason: String): Unit = {
try {
driverEndpoint.askWithReply[Boolean](RemoveExecutor(executorId, reason))
} catch {
Expand Down
Loading