-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-4286] Integrate external shuffle service to coarse grained Mesos mode #3861
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
71a3e59
e90dd4a
daeee34
dbbedc9
07ef8a0
74b2357
a85d73d
3cab809
61d8873
4ca04ab
cb8ae34
0adc400
a276a43
770042a
99415c3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,212 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.executor | ||
|
||
import org.apache.spark.{SparkConf, Logging, SecurityManager} | ||
import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary} | ||
import org.apache.spark.util.{Utils, SignalLogger} | ||
import org.apache.spark.deploy.SparkHadoopUtil | ||
import org.apache.mesos.Protos._ | ||
import org.apache.spark.deploy.worker.StandaloneWorkerShuffleService | ||
import scala.collection.JavaConversions._ | ||
import scala.io.Source | ||
import java.io.{File, PrintWriter} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should reorder these imports as we do in other files |
||
|
||
/** | ||
* The Coarse grained Mesos executor backend is responsible for launching the shuffle service | ||
* and the CoarseGrainedExecutorBackend actor. | ||
* This is assuming the scheduler detected that the shuffle service is enabled and launches | ||
* this class instead of CoarseGrainedExecutorBackend directly. | ||
*/ | ||
private[spark] class CoarseGrainedMesosExecutorBackend(val sparkConf: SparkConf) | ||
extends MesosExecutor | ||
with Logging { | ||
|
||
private var shuffleService: StandaloneWorkerShuffleService = null | ||
private var driver: ExecutorDriver = null | ||
private var executorProc: Process = null | ||
private var taskId: TaskID = null | ||
@volatile var killed = false | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should be private, otherwise anyone with a reference to this backend can "kill" it. |
||
|
||
override def registered( | ||
driver: ExecutorDriver, | ||
executorInfo: ExecutorInfo, | ||
frameworkInfo: FrameworkInfo, | ||
slaveInfo: SlaveInfo) { | ||
this.driver = driver | ||
logInfo("Coarse Grained Mesos Executor '" + executorInfo.getExecutorId.getValue + | ||
"' is registered.") | ||
|
||
if (shuffleService == null) { | ||
sparkConf.set("spark.shuffle.service.enabled", "true") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We shouldn't set this config ourselves; it is meant to be set by the user. What we should do here is check whether the user set it and then start the shuffle service if so. |
||
shuffleService = new StandaloneWorkerShuffleService(sparkConf, new SecurityManager(sparkConf)) | ||
shuffleService.startIfEnabled() | ||
} | ||
} | ||
|
||
override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { | ||
if (executorProc != null) { | ||
logError("Received LaunchTask while executor is already running") | ||
val status = TaskStatus.newBuilder() | ||
.setTaskId(taskInfo.getTaskId) | ||
.setSlaveId(taskInfo.getSlaveId) | ||
.setState(TaskState.TASK_FAILED) | ||
.setMessage("Received LaunchTask while executor is already running") | ||
.build() | ||
d.sendStatusUpdate(status) | ||
return | ||
} | ||
|
||
killed = false | ||
|
||
logInfo("Launching task id: " + taskInfo.getTaskId.getValue) | ||
|
||
// We are launching the CoarseGrainedExecutorBackend via subprocess | ||
// because the backend is designed to run in its own process. | ||
// Since it's a shared class we are preserving the existing behavior | ||
// and launching it as a subprocess here. | ||
val command = "exec " + Utils.deserialize[String](taskInfo.getData().toByteArray) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here |
||
|
||
logInfo("Running command: " + command) | ||
|
||
// Mesos only work on linux platforms, so we assume bash is available is Mesos is used. | ||
val pb = new ProcessBuilder("/bin/bash", "-c", command) | ||
|
||
val currentEnvVars = pb.environment() | ||
for (variable <- taskInfo.getExecutor.getCommand.getEnvironment.getVariablesList()) { | ||
currentEnvVars.put(variable.getName, variable.getValue) | ||
} | ||
|
||
executorProc = pb.start() | ||
|
||
new Thread("stderr reader for task " + taskInfo.getTaskId.getValue) { | ||
override def run() { | ||
for (line <- Source.fromInputStream(executorProc.getErrorStream).getLines) { | ||
System.err.println(line) | ||
} | ||
} | ||
}.start() | ||
|
||
new Thread("stdout reader for task " + taskInfo.getTaskId.getValue) { | ||
override def run() { | ||
for (line <- Source.fromInputStream(executorProc.getInputStream).getLines) { | ||
System.out.println(line) | ||
} | ||
} | ||
}.start() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you do these through |
||
|
||
driver.sendStatusUpdate(TaskStatus.newBuilder() | ||
.setState(TaskState.TASK_RUNNING) | ||
.setTaskId(taskInfo.getTaskId) | ||
.build) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For readability I would prefer that you put the builder in a |
||
|
||
new Thread("process waiter for mesos executor for task " + taskInfo.getTaskId.getValue) { | ||
override def run() { | ||
executorProc.waitFor() | ||
val (state, msg) = if (killed) { | ||
(TaskState.TASK_KILLED, "") | ||
} else if (executorProc.exitValue() == 0) { | ||
(TaskState.TASK_FINISHED, "") | ||
} else { | ||
(TaskState.TASK_FAILED, "Exited with status: " + executorProc.exitValue().toString) | ||
} | ||
// We leave the shuffle service running after the task. | ||
cleanup(state, msg) | ||
} | ||
}.start() | ||
|
||
taskId = taskInfo.getTaskId | ||
} | ||
|
||
override def error(d: ExecutorDriver, message: String) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. methods without return types should use |
||
logError("Error from Mesos: " + message) | ||
} | ||
|
||
override def killTask(d: ExecutorDriver, t: TaskID) { | ||
if (taskId == null) { | ||
logError("Received killtask when no process is initialized") | ||
return | ||
} | ||
|
||
if (!taskId.getValue.equals(t.getValue)) { | ||
logError("Asked to kill task '" + t.getValue + "' but executor is running task '" + | ||
taskId.getValue + "'") | ||
return | ||
} | ||
|
||
assert(executorProc != null) | ||
killed = true | ||
// We only destroy the coarse grained executor but leave the shuffle | ||
// service running for other tasks that might be reusing this executor. | ||
// This is no-op if the process already finished. | ||
executorProc.destroy() | ||
} | ||
|
||
def cleanup(state: TaskState, msg: String = ""): Unit = synchronized { | ||
if (driver == null) { | ||
logError("Cleaning up process but driver is not initialized") | ||
return | ||
} | ||
|
||
if (executorProc == null) { | ||
logDebug("Process is not started or already cleaned up") | ||
return | ||
} | ||
|
||
assert(taskId != null) | ||
|
||
driver.sendStatusUpdate(TaskStatus.newBuilder() | ||
.setState(state) | ||
.setMessage(msg) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what happens if you set an empty message? Does it still get logged? Should we set the message only if it's not empty? |
||
.setTaskId(taskId) | ||
.build) | ||
|
||
executorProc = null | ||
taskId = null | ||
} | ||
|
||
override def reregistered(d: ExecutorDriver, p2: SlaveInfo) {} | ||
|
||
override def disconnected(d: ExecutorDriver) {} | ||
|
||
override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {} | ||
|
||
override def shutdown(d: ExecutorDriver) { | ||
if (executorProc != null) { | ||
killTask(d, taskId) | ||
} | ||
|
||
if (shuffleService != null) { | ||
shuffleService.stop() | ||
shuffleService = null | ||
} | ||
} | ||
} | ||
|
||
private[spark] object CoarseGrainedMesosExecutorBackend extends Logging { | ||
def main(args: Array[String]) { | ||
SignalLogger.register(log) | ||
SparkHadoopUtil.get.runAsSparkUser { () => | ||
MesosNativeLibrary.load() | ||
val sparkConf = new SparkConf() | ||
// Create a new Executor and start it running | ||
val runner = new CoarseGrainedMesosExecutorBackend(sparkConf) | ||
new MesosExecutorDriver(runner).run() | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The correct thing to do here is not to remove its visibility restrictions, but to abstract this class so it is not used exclusively for standalone mode.