Skip to content

[SPARK-23472][CORE] Add defaultJavaOptions for driver and executor. #24804

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
import org.apache.spark.deploy.ClientArguments._
import org.apache.spark.internal.config
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -135,6 +136,7 @@ private[rest] class StandaloneSubmitRequestServlet(
val sparkProperties = request.sparkProperties
val driverMemory = sparkProperties.get(config.DRIVER_MEMORY.key)
val driverCores = sparkProperties.get(config.DRIVER_CORES.key)
val driverDefaultJavaOptions = sparkProperties.get(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS)
val driverExtraJavaOptions = sparkProperties.get(config.DRIVER_JAVA_OPTIONS.key)
val driverExtraClassPath = sparkProperties.get(config.DRIVER_CLASS_PATH.key)
val driverExtraLibraryPath = sparkProperties.get(config.DRIVER_LIBRARY_PATH.key)
Expand All @@ -160,9 +162,11 @@ private[rest] class StandaloneSubmitRequestServlet(
.set("spark.master", updatedMasters)
val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator))
val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator))
val defaultJavaOpts = driverDefaultJavaOptions.map(Utils.splitCommandString)
.getOrElse(Seq.empty)
val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val javaOpts = sparkJavaOpts ++ defaultJavaOpts ++ extraJavaOpts
val command = new Command(
"org.apache.spark.deploy.worker.DriverWrapper",
Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ appArgs, // args to the DriverWrapper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,9 @@ private[spark] class TypedConfigBuilder[T](

/** Creates a [[ConfigEntry]] that does not have a default value. */
def createOptional: OptionalConfigEntry[T] = {
val entry = new OptionalConfigEntry[T](parent.key, parent._alternatives, converter,
stringConverter, parent._doc, parent._public)
val entry = new OptionalConfigEntry[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, converter, stringConverter, parent._doc,
parent._public)
parent._onCreate.foreach(_(entry))
entry
}
Expand All @@ -141,17 +142,19 @@ private[spark] class TypedConfigBuilder[T](
createWithDefaultString(default.asInstanceOf[String])
} else {
val transformedDefault = converter(stringConverter(default))
val entry = new ConfigEntryWithDefault[T](parent.key, parent._alternatives,
transformedDefault, converter, stringConverter, parent._doc, parent._public)
val entry = new ConfigEntryWithDefault[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, transformedDefault, converter,
stringConverter, parent._doc, parent._public)
parent._onCreate.foreach(_(entry))
entry
}
}

/** Creates a [[ConfigEntry]] with a function to determine the default value */
def createWithDefaultFunction(defaultFunc: () => T): ConfigEntry[T] = {
val entry = new ConfigEntryWithDefaultFunction[T](parent.key, parent._alternatives, defaultFunc,
converter, stringConverter, parent._doc, parent._public)
val entry = new ConfigEntryWithDefaultFunction[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, defaultFunc, converter, stringConverter,
parent._doc, parent._public)
parent._onCreate.foreach(_ (entry))
entry
}
Expand All @@ -161,8 +164,9 @@ private[spark] class TypedConfigBuilder[T](
* [[String]] and must be a valid value for the entry.
*/
def createWithDefaultString(default: String): ConfigEntry[T] = {
val entry = new ConfigEntryWithDefaultString[T](parent.key, parent._alternatives, default,
converter, stringConverter, parent._doc, parent._public)
val entry = new ConfigEntryWithDefaultString[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, default, converter, stringConverter,
parent._doc, parent._public)
parent._onCreate.foreach(_(entry))
entry
}
Expand All @@ -178,6 +182,8 @@ private[spark] case class ConfigBuilder(key: String) {

import ConfigHelpers._

private[config] var _prependedKey: Option[String] = None
private[config] var _prependSeparator: String = ""
private[config] var _public = true
private[config] var _doc = ""
private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None
Expand All @@ -202,24 +208,34 @@ private[spark] case class ConfigBuilder(key: String) {
this
}

def withPrepended(key: String, separator: String = " "): ConfigBuilder = {
_prependedKey = Option(key)
_prependSeparator = separator
this
}

def withAlternative(key: String): ConfigBuilder = {
_alternatives = _alternatives :+ key
this
}

def intConf: TypedConfigBuilder[Int] = {
checkPrependConfig
new TypedConfigBuilder(this, toNumber(_, _.toInt, key, "int"))
}

def longConf: TypedConfigBuilder[Long] = {
checkPrependConfig
new TypedConfigBuilder(this, toNumber(_, _.toLong, key, "long"))
}

def doubleConf: TypedConfigBuilder[Double] = {
checkPrependConfig
new TypedConfigBuilder(this, toNumber(_, _.toDouble, key, "double"))
}

def booleanConf: TypedConfigBuilder[Boolean] = {
checkPrependConfig
new TypedConfigBuilder(this, toBoolean(_, key))
}

Expand All @@ -228,20 +244,30 @@ private[spark] case class ConfigBuilder(key: String) {
}

def timeConf(unit: TimeUnit): TypedConfigBuilder[Long] = {
checkPrependConfig
new TypedConfigBuilder(this, timeFromString(_, unit), timeToString(_, unit))
}

def bytesConf(unit: ByteUnit): TypedConfigBuilder[Long] = {
checkPrependConfig
new TypedConfigBuilder(this, byteFromString(_, unit), byteToString(_, unit))
}

def fallbackConf[T](fallback: ConfigEntry[T]): ConfigEntry[T] = {
val entry = new FallbackConfigEntry(key, _alternatives, _doc, _public, fallback)
val entry = new FallbackConfigEntry(key, _prependedKey, _prependSeparator, _alternatives, _doc,
_public, fallback)
_onCreate.foreach(_(entry))
entry
}

def regexConf: TypedConfigBuilder[Regex] = {
checkPrependConfig
new TypedConfigBuilder(this, regexFromString(_, this.key), _.toString)
}

private def checkPrependConfig = {
if (_prependedKey.isDefined) {
throw new IllegalArgumentException(s"$key type must be string if prepend used")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ package org.apache.spark.internal.config
* value declared as a string.
*
* @param key the key for the configuration
* @param prependedKey the key for the configuration which will be prepended
* @param prependSeparator the separator which is used for prepending
* @param valueConverter how to convert a string to the value. It should throw an exception if the
* string does not have the required format.
* @param stringConverter how to convert a value to a string that the user can use it as a valid
Expand All @@ -41,6 +43,8 @@ package org.apache.spark.internal.config
*/
private[spark] abstract class ConfigEntry[T] (
val key: String,
val prependedKey: Option[String],
val prependSeparator: String,
val alternatives: List[String],
val valueConverter: String => T,
val stringConverter: T => String,
Expand All @@ -54,7 +58,15 @@ private[spark] abstract class ConfigEntry[T] (
def defaultValueString: String

protected def readString(reader: ConfigReader): Option[String] = {
alternatives.foldLeft(reader.get(key))((res, nextKey) => res.orElse(reader.get(nextKey)))
val values = Seq(
prependedKey.flatMap(reader.get(_)),
alternatives.foldLeft(reader.get(key))((res, nextKey) => res.orElse(reader.get(nextKey)))
).flatten
if (values.nonEmpty) {
Some(values.mkString(prependSeparator))
} else {
None
}
}

def readFrom(reader: ConfigReader): T
Expand All @@ -68,13 +80,24 @@ private[spark] abstract class ConfigEntry[T] (

private class ConfigEntryWithDefault[T] (
key: String,
prependedKey: Option[String],
prependSeparator: String,
alternatives: List[String],
_defaultValue: T,
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) {
extends ConfigEntry(
key,
prependedKey,
prependSeparator,
alternatives,
valueConverter,
stringConverter,
doc,
isPublic
) {

override def defaultValue: Option[T] = Some(_defaultValue)

Expand All @@ -86,14 +109,25 @@ private class ConfigEntryWithDefault[T] (
}

private class ConfigEntryWithDefaultFunction[T] (
key: String,
alternatives: List[String],
_defaultFunction: () => T,
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) {
key: String,
prependedKey: Option[String],
prependSeparator: String,
alternatives: List[String],
_defaultFunction: () => T,
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry(
key,
prependedKey,
prependSeparator,
alternatives,
valueConverter,
stringConverter,
doc,
isPublic
) {

override def defaultValue: Option[T] = Some(_defaultFunction())

Expand All @@ -106,13 +140,24 @@ private class ConfigEntryWithDefaultFunction[T] (

private class ConfigEntryWithDefaultString[T] (
key: String,
prependedKey: Option[String],
prependSeparator: String,
alternatives: List[String],
_defaultValue: String,
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) {
extends ConfigEntry(
key,
prependedKey,
prependSeparator,
alternatives,
valueConverter,
stringConverter,
doc,
isPublic
) {

override def defaultValue: Option[T] = Some(valueConverter(_defaultValue))

Expand All @@ -130,14 +175,23 @@ private class ConfigEntryWithDefaultString[T] (
*/
private[spark] class OptionalConfigEntry[T](
key: String,
prependedKey: Option[String],
prependSeparator: String,
alternatives: List[String],
val rawValueConverter: String => T,
val rawStringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry[Option[T]](key, alternatives,
extends ConfigEntry[Option[T]](
key,
prependedKey,
prependSeparator,
alternatives,
s => Some(rawValueConverter(s)),
v => v.map(rawStringConverter).orNull, doc, isPublic) {
v => v.map(rawStringConverter).orNull,
doc,
isPublic
) {

override def defaultValueString: String = ConfigEntry.UNDEFINED

Expand All @@ -151,12 +205,22 @@ private[spark] class OptionalConfigEntry[T](
*/
private[spark] class FallbackConfigEntry[T] (
key: String,
prependedKey: Option[String],
prependSeparator: String,
alternatives: List[String],
doc: String,
isPublic: Boolean,
val fallback: ConfigEntry[T])
extends ConfigEntry[T](key, alternatives,
fallback.valueConverter, fallback.stringConverter, doc, isPublic) {
extends ConfigEntry[T](
key,
prependedKey,
prependSeparator,
alternatives,
fallback.valueConverter,
fallback.stringConverter,
doc,
isPublic
) {

override def defaultValueString: String = s"<value of ${fallback.key}>"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ package object config {
ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.createOptional

private[spark] val DRIVER_JAVA_OPTIONS =
ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS).stringConf.createOptional
ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS)
.withPrepended(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS)
.stringConf
.createOptional

private[spark] val DRIVER_LIBRARY_PATH =
ConfigBuilder(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH).stringConf.createOptional
Expand Down Expand Up @@ -178,7 +181,10 @@ package object config {
ConfigBuilder("spark.executor.heartbeat.maxFailures").internal().intConf.createWithDefault(60)

private[spark] val EXECUTOR_JAVA_OPTIONS =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS)
.withPrepended(SparkLauncher.EXECUTOR_DEFAULT_JAVA_OPTIONS)
.stringConf
.createOptional

private[spark] val EXECUTOR_LIBRARY_PATH =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_LIBRARY_PATH).stringConf.createOptional
Expand Down
Loading