Skip to content

Add role and checkpoint support for Mesos backend #60

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,16 @@ private[spark] class CoarseMesosSchedulerBackend(
setDaemon(true)
override def run() {
val scheduler = CoarseMesosSchedulerBackend.this
val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(sc.appName).build()
driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
val fwBuilder = FrameworkInfo.newBuilder().setUser("").setName(sc.appName)
val role = sc.conf.get("spark.mesos.role", null)
val checkpoint = sc.conf.get("spark.mesos.checkpoint", null)
if (role != null) {
fwBuilder.setRole(role)
}
if (checkpoint != null) {
fwBuilder.setCheckpoint(checkpoint.toBoolean)
}
driver = new MesosSchedulerDriver(scheduler, fwBuilder.build(), master)
try { {
val ret = driver.run()
logInfo("driver.run() returned with code " + ret)
Expand Down Expand Up @@ -196,8 +204,12 @@ private[spark] class CoarseMesosSchedulerBackend(

for (offer <- offers) {
val slaveId = offer.getSlaveId.toString
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
val memResource = getResource(offer.getResourcesList, "mem")
val mem = memResource.getScalar.getValue
val memRole = memResource.getRole
val cpusResource = getResource(offer.getResourcesList, "cpus")
val cpus = cpusResource.getScalar.getValue.toInt
val cpusRole = cpusResource.getRole
if (totalCoresAcquired < maxCores && mem >= sc.executorMemory && cpus >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
!slaveIdsWithExecutors.contains(slaveId)) {
Expand All @@ -213,8 +225,8 @@ private[spark] class CoarseMesosSchedulerBackend(
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
.setName("Task " + taskId)
.addResources(createResource("cpus", cpusToUse))
.addResources(createResource("mem", sc.executorMemory))
.addResources(createResource("cpus", cpusToUse, cpusRole))
.addResources(createResource("mem", sc.executorMemory, memRole))
.build()
d.launchTasks(
Collections.singleton(offer.getId), Collections.singletonList(task), filters)
Expand All @@ -228,20 +240,25 @@ private[spark] class CoarseMesosSchedulerBackend(
}

/** Helper function to pull out a resource from a Mesos Resources protobuf */
private def getResource(res: JList[Resource], name: String): Double = {
private def getResource(res: JList[Resource], name: String): Resource = {
for (r <- res if r.getName == name) {
return r.getScalar.getValue
return r
}
// If we reached here, no resource with the required name was present
throw new IllegalArgumentException("No resource called " + name + " in " + res)
}

/** Build a Mesos resource protobuf object */
private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
private def createResource(
resourceName: String,
quantity: Double,
role: String): Protos.Resource = {

Resource.newBuilder()
.setName(resourceName)
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
.setRole(role)
.build()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ private[spark] class MesosSchedulerBackend(
// Driver for talking to Mesos
var driver: SchedulerDriver = null

// Which slave IDs we have executors on
val slaveIdsWithExecutors = new HashSet[String]
// Executors with different resources roles can't share the same ID,
// so we save the ExecutorInfo for each slave in memory.
val slaveIdToExecutorInfo = new HashMap[String, ExecutorInfo]
val taskIdToSlaveId = new HashMap[Long, String]

// An ExecutorInfo for our tasks
Expand All @@ -70,8 +71,16 @@ private[spark] class MesosSchedulerBackend(
setDaemon(true)
override def run() {
val scheduler = MesosSchedulerBackend.this
val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(sc.appName).build()
driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
val fwBuilder = FrameworkInfo.newBuilder().setUser("").setName(sc.appName)
val role = sc.conf.get("spark.mesos.role", null)
val checkpoint = sc.conf.get("spark.mesos.checkpoint", null)
if (role != null) {
fwBuilder.setRole(role)
}
if (checkpoint != null) {
fwBuilder.setCheckpoint(checkpoint.toBoolean)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO I think we should have checkpoint on by default. We're close to setting checkpoint on by default on the slave configuration now, and without checkpointing slave crash will not recover any of the tasks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any cost to this?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. There is a I/O cost to the checkpointing/recovery. On startup the slave needs read state from disk to recover state. But this is a one time cost. During steady state, the slave will checkpoint state for every status update (asynchronously) and task/executor launch (synchronously). The state itself is pretty tiny (serialized TaskInfo, Status update etc protobufs) fwiw.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for chiming in Vinod :) The state and IO cost shouldn't impact perf that much and great benefits. So I suggest we turn this on by default.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's the truth. Task status update is tiny in other frameworks but not in Spark, because Spark use status updates to send serialized results.

As I've tested, when enabling checkpoint, the performance drop is observable. So I suggest not enabling it by default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I didn't realize that we're writing out StatusUpdates as well for checkpointing. In our case yes this will incur cost for each task. Thanks for testing it out @iven, let's leave it as is then.

}
driver = new MesosSchedulerDriver(scheduler, fwBuilder.build(), master)
try {
val ret = driver.run()
logInfo("driver.run() returned with code " + ret)
Expand All @@ -85,7 +94,11 @@ private[spark] class MesosSchedulerBackend(
}
}

def createExecutorInfo(execId: String): ExecutorInfo = {
def createExecutorInfo(offer: Offer): ExecutorInfo = {
val slaveId = offer.getSlaveId.getValue
if (slaveIdToExecutorInfo.contains(slaveId)) {
return slaveIdToExecutorInfo(slaveId)
}
val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
.orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility
.getOrElse {
Expand Down Expand Up @@ -128,13 +141,16 @@ private[spark] class MesosSchedulerBackend(
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(sc.executorMemory).build())
.setRole(getResource(offer.getResourcesList, "mem").getRole)
.build()
ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
val executorInfo = ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(slaveId).build())
.setCommand(command)
.setData(ByteString.copyFrom(createExecArg()))
.addResources(memory)
.build()
slaveIdToExecutorInfo(slaveId) = executorInfo
executorInfo
}

/**
Expand Down Expand Up @@ -205,17 +221,17 @@ private[spark] class MesosSchedulerBackend(
val offerableIndices = new HashMap[String, Int]

def enoughMemory(o: Offer) = {
val mem = getResource(o.getResourcesList, "mem")
val mem = getResource(o.getResourcesList, "mem").getScalar.getValue
val slaveId = o.getSlaveId.getValue
mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId)
mem >= sc.executorMemory || slaveIdToExecutorInfo.contains(slaveId)
}

for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
offerableIndices.put(offer.getSlaveId.getValue, index)
offerableWorkers += new WorkerOffer(
offer.getSlaveId.getValue,
offer.getHostname,
getResource(offer.getResourcesList, "cpus").toInt)
getResource(offer.getResourcesList, "cpus").getScalar.getValue.toInt)
}

// Call into the TaskSchedulerImpl
Expand All @@ -228,9 +244,8 @@ private[spark] class MesosSchedulerBackend(
for (taskDesc <- taskList) {
val slaveId = taskDesc.executorId
val offerNum = offerableIndices(slaveId)
slaveIdsWithExecutors += slaveId
taskIdToSlaveId(taskDesc.taskId) = slaveId
mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
mesosTasks(offerNum).add(createMesosTask(taskDesc, offers(offerNum)))
}
}
}
Expand All @@ -247,26 +262,28 @@ private[spark] class MesosSchedulerBackend(
}

/** Helper function to pull out a resource from a Mesos Resources protobuf */
def getResource(res: JList[Resource], name: String): Double = {
def getResource(res: JList[Resource], name: String): Resource = {
for (r <- res if r.getName == name) {
return r.getScalar.getValue
return r
}
// If we reached here, no resource with the required name was present
throw new IllegalArgumentException("No resource called " + name + " in " + res)
}

/** Turn a Spark TaskDescription into a Mesos task */
def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
def createMesosTask(task: TaskDescription, offer: Offer): MesosTaskInfo = {
val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
val slaveId = offer.getSlaveId.getValue
val cpuResource = Resource.newBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(scheduler.CPUS_PER_TASK).build())
.setRole(getResource(offer.getResourcesList, "cpus").getRole)
.build()
MesosTaskInfo.newBuilder()
.setTaskId(taskId)
.setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
.setExecutor(createExecutorInfo(slaveId))
.setExecutor(createExecutorInfo(offer))
.setName(task.name)
.addResources(cpuResource)
.setData(ByteString.copyFrom(task.serializedTask))
Expand All @@ -289,7 +306,7 @@ private[spark] class MesosSchedulerBackend(
synchronized {
if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
// We lost the executor on this slave, so remember that it's gone
slaveIdsWithExecutors -= taskIdToSlaveId(tid)
slaveIdToExecutorInfo -= taskIdToSlaveId(tid)
}
if (isFinished(status.getState)) {
taskIdToSlaveId.remove(tid)
Expand Down Expand Up @@ -328,7 +345,7 @@ private[spark] class MesosSchedulerBackend(
try {
logInfo("Mesos slave lost: " + slaveId.getValue)
synchronized {
slaveIdsWithExecutors -= slaveId.getValue
slaveIdToExecutorInfo -= slaveId.getValue
}
scheduler.executorLost(slaveId.getValue, reason)
} finally {
Expand Down
2 changes: 1 addition & 1 deletion docs/_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ SPARK_VERSION: 1.0.0-SNAPSHOT
SPARK_VERSION_SHORT: 1.0.0
SCALA_BINARY_VERSION: "2.10"
SCALA_VERSION: "2.10.4"
MESOS_VERSION: 0.18.1
MESOS_VERSION: 0.19.1
SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK
SPARK_GITHUB_URL: https://github.com/apache/spark
18 changes: 18 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,24 @@ Apart from these, the following properties are also available, and may be useful
for the whole duration of the Spark job.
</td>
</tr>
<tr>
<td><code>spark.mesos.role</code></td>
<td>(not set)</td>
<td>
Allocation role of the framework when running on Mesos. If not set, the default value of Mesos
framework will be used.
</td>
</tr>
<tr>
<td><code>spark.mesos.checkpoint</code></td>
<td>(not set)</td>
<td>
Whether to checkpoint task information to disk when running on Mesos, using the Mesos
<a href="http://mesos.apache.org/documentation/latest/slave-recovery/">slave recovery feature</a>.
If not set, the default value of Mesos framework will be used.
<b>Note</b>: if you use this, Spark will only accept offers from Mesos slaves with checkpointing enabled.
</td>
</tr>
<tr>
<td><code>spark.speculation</code></td>
<td>false</td>
Expand Down
12 changes: 12 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,18 @@ it does not need to be redundantly passed in as a system property.
{% endhighlight %}


# Mesos Configuration Properties

When running on Mesos, Spark supports the role and checkpointing features like other
Mesos frameworks. Just set the `spark.mesos.role` and `spark.mesos.checkpoint`
[config properties](configuration.html#spark-properties). For example:

{% highlight scala %}
conf.set("spark.mesos.role", "spark")
conf.set("spark.mesos.checkpoint", "true")
{% endhighlight %}


# Mesos Run Modes

Spark can run over Mesos in two modes: "fine-grained" (default) and "coarse-grained".
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
<scala.version>2.10.4</scala.version>
<scala.binary.version>2.10</scala.binary.version>
<scala.macros.version>2.0.1</scala.macros.version>
<mesos.version>0.18.1</mesos.version>
<mesos.version>0.19.1</mesos.version>
<mesos.classifier>shaded-protobuf</mesos.classifier>
<akka.group>org.spark-project.akka</akka.group>
<akka.version>2.2.3-shaded-protobuf</akka.version>
Expand Down