Skip to content

Commit

Permalink
SPARK-1569 Spark on Yarn, authentication broken by pr299
Browse files Browse the repository at this point in the history
Pass the configs as java options since the executor needs to know before it registers whether to create the connection using authentication or not.    We could see about passing only the authentication configs but for now I just had it pass them all.

I also updating it to use a list to construct the command to make it the same as ClientBase and avoid any issues with spaces.

Author: Thomas Graves <tgraves@apache.org>

Closes apache#649 from tgravescs/SPARK-1569 and squashes the following commits:

0178ab8 [Thomas Graves] add akka settings
22a8735 [Thomas Graves] Change to only path spark.auth* configs
8ccc1d4 [Thomas Graves] SPARK-1569 Spark on Yarn, authentication broken
  • Loading branch information
tgravescs authored and pwendell committed May 7, 2014
1 parent 5200872 commit 4bec84b
Showing 1 changed file with 30 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File
import java.net.URI

import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import scala.collection.mutable.{HashMap, ListBuffer}

import org.apache.hadoop.fs.Path
import org.apache.hadoop.yarn.api._
Expand All @@ -44,9 +44,9 @@ trait ExecutorRunnableUtil extends Logging {
hostname: String,
executorMemory: Int,
executorCores: Int,
localResources: HashMap[String, LocalResource]) = {
localResources: HashMap[String, LocalResource]): List[String] = {
// Extra options for the JVM
var JAVA_OPTS = ""
val JAVA_OPTS = ListBuffer[String]()
// Set the JVM memory
val executorMemoryString = executorMemory + "m"
JAVA_OPTS += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
Expand All @@ -56,10 +56,21 @@ trait ExecutorRunnableUtil extends Logging {
JAVA_OPTS += opts
}

JAVA_OPTS += " -Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
JAVA_OPTS += "-Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)

// Certain configs need to be passed here because they are needed before the Executor
// registers with the Scheduler and transfers the spark configs. Since the Executor backend
// uses Akka to connect to the scheduler, the akka settings are needed as well as the
// authentication settings.
sparkConf.getAll.
filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }.
foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\"" }

sparkConf.getAkkaConf.
foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\"" }

// Commenting it out for now - so that people can refer to the properties if required. Remove
// it once cpuset version is pushed out.
// The context is, default gc for server class machines end up using all cores to do gc - hence
Expand All @@ -85,25 +96,25 @@ trait ExecutorRunnableUtil extends Logging {
}
*/

val commands = List[String](
Environment.JAVA_HOME.$() + "/bin/java" +
" -server " +
val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java",
"-server",
// Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
// Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
// an inconsistent state.
// TODO: If the OOM is not recoverable by rescheduling it on different node, then do
// 'something' to fail job ... akin to blacklisting trackers in mapred ?
" -XX:OnOutOfMemoryError='kill %p' " +
JAVA_OPTS +
" org.apache.spark.executor.CoarseGrainedExecutorBackend " +
masterAddress + " " +
slaveId + " " +
hostname + " " +
executorCores +
" 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
" 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

commands
"-XX:OnOutOfMemoryError='kill %p'") ++
JAVA_OPTS ++
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
masterAddress.toString,
slaveId.toString,
hostname.toString,
executorCores.toString,
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

// TODO: it would be nicer to just make sure there are no null commands here
commands.map(s => if (s == null) "null" else s).toList
}

private def setupDistributedCache(
Expand Down

0 comments on commit 4bec84b

Please sign in to comment.