Skip to content

[SPARK-4286] Integrate external shuffle service to coarse grained Mesos mode #3861

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1002,8 +1002,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
@DeveloperApi
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
assert(master.contains("yarn") || dynamicAllocationTesting,
"Requesting executors is currently only supported in YARN mode")
assert(master.contains("mesos") || master.contains("yarn") || dynamicAllocationTesting,
"Requesting executors is currently only supported in YARN or Mesos coarse-grained mode")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestExecutors(numAdditionalExecutors)
Expand All @@ -1020,8 +1020,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
@DeveloperApi
override def killExecutors(executorIds: Seq[String]): Boolean = {
assert(master.contains("yarn") || dynamicAllocationTesting,
"Killing executors is currently only supported in YARN mode")
assert(master.contains("mesos") || master.contains("yarn") || dynamicAllocationTesting,
"Killing executors is currently only supported in YARN or Mesos mode")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(executorIds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
*
* Optionally requires SASL authentication in order to read. See [[SecurityManager]].
*/
private[worker]
class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The correct thing to do here is not to remove its visibility restrictions, but to abstract this class so it is not used exclusively for standalone mode.

extends Logging {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.executor

import org.apache.spark.{SparkConf, Logging, SecurityManager}
import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary}
import org.apache.spark.util.{Utils, SignalLogger}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.mesos.Protos._
import org.apache.spark.deploy.worker.StandaloneWorkerShuffleService
import scala.collection.JavaConversions._
import scala.io.Source
import java.io.{File, PrintWriter}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should reorder these imports as we do in other files


/**
* The Coarse grained Mesos executor backend is responsible for launching the shuffle service
* and the CoarseGrainedExecutorBackend actor.
* This is assuming the scheduler detected that the shuffle service is enabled and launches
* this class instead of CoarseGrainedExecutorBackend directly.
*/
private[spark] class CoarseGrainedMesosExecutorBackend(val sparkConf: SparkConf)
extends MesosExecutor
with Logging {

private var shuffleService: StandaloneWorkerShuffleService = null
private var driver: ExecutorDriver = null
private var executorProc: Process = null
private var taskId: TaskID = null
@volatile var killed = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be private, otherwise anyone with a reference to this backend can "kill" it.


override def registered(
driver: ExecutorDriver,
executorInfo: ExecutorInfo,
frameworkInfo: FrameworkInfo,
slaveInfo: SlaveInfo) {
this.driver = driver
logInfo("Coarse Grained Mesos Executor '" + executorInfo.getExecutorId.getValue +
"' is registered.")

if (shuffleService == null) {
sparkConf.set("spark.shuffle.service.enabled", "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't set this config ourselves; it is meant to be set by the user. What we should do here is check whether the user set it and then start the shuffle service if so.

shuffleService = new StandaloneWorkerShuffleService(sparkConf, new SecurityManager(sparkConf))
shuffleService.startIfEnabled()
}
}

override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
if (executorProc != null) {
logError("Received LaunchTask while executor is already running")
val status = TaskStatus.newBuilder()
.setTaskId(taskInfo.getTaskId)
.setSlaveId(taskInfo.getSlaveId)
.setState(TaskState.TASK_FAILED)
.setMessage("Received LaunchTask while executor is already running")
.build()
d.sendStatusUpdate(status)
return
}

killed = false

logInfo("Launching task id: " + taskInfo.getTaskId.getValue)

// We are launching the CoarseGrainedExecutorBackend via subprocess
// because the backend is designed to run in its own process.
// Since it's a shared class we are preserving the existing behavior
// and launching it as a subprocess here.
val command = "exec " + Utils.deserialize[String](taskInfo.getData().toByteArray)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here command is a giant string instead of a list of command line arguments. Should we do something like Utils.splitCommandString here to preserve the command properly when we pass it to the ProcessBuilder?


logInfo("Running command: " + command)

// Mesos only work on linux platforms, so we assume bash is available is Mesos is used.
val pb = new ProcessBuilder("/bin/bash", "-c", command)

val currentEnvVars = pb.environment()
for (variable <- taskInfo.getExecutor.getCommand.getEnvironment.getVariablesList()) {
currentEnvVars.put(variable.getName, variable.getValue)
}

executorProc = pb.start()

new Thread("stderr reader for task " + taskInfo.getTaskId.getValue) {
override def run() {
for (line <- Source.fromInputStream(executorProc.getErrorStream).getLines) {
System.err.println(line)
}
}
}.start()

new Thread("stdout reader for task " + taskInfo.getTaskId.getValue) {
override def run() {
for (line <- Source.fromInputStream(executorProc.getInputStream).getLines) {
System.out.println(line)
}
}
}.start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you do these through RedirectThread in Utils.scala?


driver.sendStatusUpdate(TaskStatus.newBuilder()
.setState(TaskState.TASK_RUNNING)
.setTaskId(taskInfo.getTaskId)
.build)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For readability I would prefer that you put the builder in a val first, and then calling driver.sendStatusUpdate on it. Also, please use parentheses for build() here because it's not a getter without side-effects.


new Thread("process waiter for mesos executor for task " + taskInfo.getTaskId.getValue) {
override def run() {
executorProc.waitFor()
val (state, msg) = if (killed) {
(TaskState.TASK_KILLED, "")
} else if (executorProc.exitValue() == 0) {
(TaskState.TASK_FINISHED, "")
} else {
(TaskState.TASK_FAILED, "Exited with status: " + executorProc.exitValue().toString)
}
// We leave the shuffle service running after the task.
cleanup(state, msg)
}
}.start()

taskId = taskInfo.getTaskId
}

override def error(d: ExecutorDriver, message: String) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

methods without return types should use : Unit (see https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide)

logError("Error from Mesos: " + message)
}

override def killTask(d: ExecutorDriver, t: TaskID) {
if (taskId == null) {
logError("Received killtask when no process is initialized")
return
}

if (!taskId.getValue.equals(t.getValue)) {
logError("Asked to kill task '" + t.getValue + "' but executor is running task '" +
taskId.getValue + "'")
return
}

assert(executorProc != null)
killed = true
// We only destroy the coarse grained executor but leave the shuffle
// service running for other tasks that might be reusing this executor.
// This is no-op if the process already finished.
executorProc.destroy()
}

def cleanup(state: TaskState, msg: String = ""): Unit = synchronized {
if (driver == null) {
logError("Cleaning up process but driver is not initialized")
return
}

if (executorProc == null) {
logDebug("Process is not started or already cleaned up")
return
}

assert(taskId != null)

driver.sendStatusUpdate(TaskStatus.newBuilder()
.setState(state)
.setMessage(msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if you set an empty message? Does it still get logged? Should we set the message only if it's not empty?

.setTaskId(taskId)
.build)

executorProc = null
taskId = null
}

override def reregistered(d: ExecutorDriver, p2: SlaveInfo) {}

override def disconnected(d: ExecutorDriver) {}

override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {}

override def shutdown(d: ExecutorDriver) {
if (executorProc != null) {
killTask(d, taskId)
}

if (shuffleService != null) {
shuffleService.stop()
shuffleService = null
}
}
}

private[spark] object CoarseGrainedMesosExecutorBackend extends Logging {
def main(args: Array[String]) {
SignalLogger.register(log)
SparkHadoopUtil.get.runAsSparkUser { () =>
MesosNativeLibrary.load()
val sparkConf = new SparkConf()
// Create a new Executor and start it running
val runner = new CoarseGrainedMesosExecutorBackend(sparkConf)
new MesosExecutorDriver(runner).run()
}
}
}
Loading