Skip to content

Commit f7b5820

Browse files
mccheahash211
authored andcommitted
Use a list of environment variables for JVM options. (apache#444)
* Use a list of environment variables for JVM options. * Fix merge conflicts.
1 parent 58cebd1 commit f7b5820

File tree

8 files changed

+152
-32
lines changed

8 files changed

+152
-32
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ package object constants {
6969
private[spark] val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"
7070
private[spark] val ENV_PYSPARK_FILES = "PYSPARK_FILES"
7171
private[spark] val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
72+
private[spark] val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"
7273
private[spark] val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR"
7374

7475
// Bootstrapping dependencies with the init-container

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ package org.apache.spark.deploy.kubernetes.submit
1818

1919
import java.util.{Collections, UUID}
2020

21-
import io.fabric8.kubernetes.api.model.{ContainerBuilder, OwnerReferenceBuilder, PodBuilder}
21+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVar, EnvVarBuilder, OwnerReferenceBuilder, PodBuilder}
2222
import io.fabric8.kubernetes.client.KubernetesClient
2323
import scala.collection.mutable
24+
import scala.collection.JavaConverters._
2425

2526
import org.apache.spark.SparkConf
2627
import org.apache.spark.deploy.kubernetes.config._
@@ -92,18 +93,21 @@ private[spark] class Client(
9293
currentDriverSpec = nextStep.configureDriver(currentDriverSpec)
9394
}
9495
val resolvedDriverJavaOpts = currentDriverSpec
95-
.driverSparkConf
96-
// We don't need this anymore since we just set the JVM options on the environment
97-
.remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
98-
.getAll
99-
.map {
100-
case (confKey, confValue) => s"-D$confKey=$confValue"
101-
}.mkString(" ") + driverJavaOptions.map(" " + _).getOrElse("")
96+
.driverSparkConf
97+
// We don't need this anymore since we just set the JVM options on the environment
98+
.remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
99+
.getAll
100+
.map {
101+
case (confKey, confValue) => s"-D$confKey=$confValue"
102+
} ++ driverJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
103+
val driverJavaOptsEnvs: Seq[EnvVar] = resolvedDriverJavaOpts.zipWithIndex.map {
104+
case (option, index) => new EnvVarBuilder()
105+
.withName(s"$ENV_JAVA_OPT_PREFIX$index")
106+
.withValue(option)
107+
.build()
108+
}
102109
val resolvedDriverContainer = new ContainerBuilder(currentDriverSpec.driverContainer)
103-
.addNewEnv()
104-
.withName(ENV_DRIVER_JAVA_OPTS)
105-
.withValue(resolvedDriverJavaOpts)
106-
.endEnv()
110+
.addAllToEnv(driverJavaOptsEnvs.asJava)
107111
.build()
108112
val resolvedDriverPod = new PodBuilder(currentDriverSpec.driverPod)
109113
.editSpec()

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientSuite.scala

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
135135
.set("spark.logConf", "true")
136136
.set(
137137
org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS,
138-
"-XX:+|-HeapDumpOnOutOfMemoryError")
138+
"-XX:+HeapDumpOnOutOfMemoryError -XX:+PrintGCDetails")
139139
val submissionClient = new Client(
140140
submissionSteps,
141141
sparkConf,
@@ -147,15 +147,22 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
147147
val createdPod = createdPodArgumentCaptor.getValue
148148
val driverContainer = Iterables.getOnlyElement(createdPod.getSpec.getContainers)
149149
assert(driverContainer.getName === SecondTestConfigurationStep.containerName)
150-
val driverJvmOptsEnv = Iterables.getOnlyElement(driverContainer.getEnv)
151-
assert(driverJvmOptsEnv.getName === ENV_DRIVER_JAVA_OPTS)
152-
val driverJvmOpts = driverJvmOptsEnv.getValue.split(" ").toSet
153-
assert(driverJvmOpts.contains("-Dspark.logConf=true"))
154-
assert(driverJvmOpts.contains(
150+
val driverJvmOptsEnvs = driverContainer.getEnv.asScala.filter { env =>
151+
env.getName.startsWith(ENV_JAVA_OPT_PREFIX)
152+
}.sortBy(_.getName)
153+
assert(driverJvmOptsEnvs.size === 4)
154+
155+
val expectedJvmOptsValues = Seq(
156+
"-Dspark.logConf=true",
155157
s"-D${SecondTestConfigurationStep.sparkConfKey}=" +
156-
SecondTestConfigurationStep.sparkConfValue))
157-
assert(driverJvmOpts.contains(
158-
"-XX:+|-HeapDumpOnOutOfMemoryError"))
158+
s"${SecondTestConfigurationStep.sparkConfValue}",
159+
s"-XX:+HeapDumpOnOutOfMemoryError",
160+
s"-XX:+PrintGCDetails")
161+
driverJvmOptsEnvs.zip(expectedJvmOptsValues).zipWithIndex.foreach {
162+
case ((resolvedEnv, expectedJvmOpt), index) =>
163+
assert(resolvedEnv.getName === s"$ENV_JAVA_OPT_PREFIX$index")
164+
assert(resolvedEnv.getValue === expectedJvmOpt)
165+
}
159166
}
160167

161168
test("Waiting for app completion should stall on the watcher") {
@@ -211,8 +218,8 @@ private object SecondTestConfigurationStep extends DriverConfigurationStep {
211218
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
212219
val modifiedPod = new PodBuilder(driverSpec.driverPod)
213220
.editMetadata()
214-
.addToAnnotations(annotationKey, annotationValue)
215-
.endMetadata()
221+
.addToAnnotations(annotationKey, annotationValue)
222+
.endMetadata()
216223
.build()
217224
val resolvedSparkConf = driverSpec.driverSparkConf.clone().set(sparkConfKey, sparkConfValue)
218225
val modifiedContainer = new ContainerBuilder(driverSpec.driverContainer)

resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ ENV PYSPARK_DRIVER_PYTHON python
3939
ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH}
4040

4141
CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
42+
env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \
43+
readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \
4244
if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
4345
if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
4446
if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
45-
if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
46-
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
47-
${JAVA_HOME}/bin/java $SPARK_DRIVER_JAVA_OPTS -cp $SPARK_CLASSPATH \
48-
-Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY \
49-
$SPARK_DRIVER_CLASS $PYSPARK_PRIMARY $PYSPARK_FILES $SPARK_DRIVER_ARGS
47+
if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
48+
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
49+
${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY $SPARK_DRIVER_CLASS $PYSPARK_PRIMARY $PYSPARK_FILES $SPARK_DRIVER_ARGS

resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ FROM spark-base
2424
COPY examples /opt/spark/examples
2525

2626
CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
27+
env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \
28+
readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \
2729
if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
2830
if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
2931
if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
30-
if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
32+
if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
3133
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
32-
${JAVA_HOME}/bin/java $SPARK_DRIVER_JAVA_OPTS -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY $SPARK_DRIVER_CLASS $SPARK_DRIVER_ARGS
34+
${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY $SPARK_DRIVER_CLASS $SPARK_DRIVER_ARGS

resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/spark-base/Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ RUN apk upgrade --no-cache && \
2525
mkdir -p /opt/spark && \
2626
mkdir -p /opt/spark/work-dir \
2727
touch /opt/spark/RELEASE && \
28+
rm /bin/sh && \
29+
ln -sv /bin/bash /bin/sh && \
2830
chgrp root /etc/passwd && chmod ug+rw /etc/passwd
2931

3032
COPY jars /opt/spark/jars
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.integrationtest.jobs
18+
19+
import java.io.{File, FileInputStream}
20+
import java.util.Properties
21+
22+
import com.google.common.collect.Maps
23+
import scala.collection.JavaConverters._
24+
25+
import org.apache.spark.sql.SparkSession
26+
import org.apache.spark.util.Utils
27+
28+
private[spark] object JavaOptionsTest {
29+
30+
def main(args: Array[String]): Unit = {
31+
// scalastyle:off println
32+
if (args.length != 1) {
33+
println(s"Invalid arguments: ${args.mkString(",")}." +
34+
s"Usage: JavaOptionsTest <driver-java-options-list-file>")
35+
System.exit(1)
36+
}
37+
val expectedDriverJavaOptions = loadPropertiesFromFile(args(0))
38+
val nonMatchingDriverOptions = expectedDriverJavaOptions.filter {
39+
case (optKey, optValue) => System.getProperty(optKey) != optValue
40+
}
41+
if (nonMatchingDriverOptions.nonEmpty) {
42+
println(s"The driver's JVM options did not match. Expected $expectedDriverJavaOptions." +
43+
s" But these options did not match: $nonMatchingDriverOptions.")
44+
val sysProps = Maps.fromProperties(System.getProperties).asScala
45+
println("System properties are:")
46+
for (prop <- sysProps) {
47+
println(s"Key: ${prop._1}, Value: ${prop._2}")
48+
}
49+
System.exit(1)
50+
}
51+
52+
// TODO support spark.executor.extraJavaOptions and test here.
53+
println(s"All expected JVM options were present on the driver and executors.")
54+
// scalastyle:on println
55+
}
56+
57+
private def loadPropertiesFromFile(filePath: String): Map[String, String] = {
58+
val file = new File(filePath)
59+
if (!file.isFile) {
60+
throw new IllegalArgumentException(s"File not found at $filePath or is not a file.")
61+
}
62+
val properties = new Properties()
63+
Utils.tryWithResource(new FileInputStream(file)) { is =>
64+
properties.load(is)
65+
}
66+
Maps.fromProperties(properties).asScala.toMap
67+
}
68+
}

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
*/
1717
package org.apache.spark.deploy.kubernetes.integrationtest
1818

19-
import java.io.File
19+
import java.io.{File, FileOutputStream}
2020
import java.nio.file.Paths
21-
import java.util.UUID
21+
import java.util.{Properties, UUID}
2222

2323
import com.google.common.base.Charsets
2424
import com.google.common.io.Files
@@ -229,6 +229,26 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
229229
Seq.empty[String])
230230
}
231231

232+
test("Setting JVM options on the driver and executors with spaces.") {
233+
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
234+
launchStagingServer(SSLOptions(), None)
235+
val driverJvmOptionsFile = storeJvmOptionsInTempFile(
236+
Map("simpleDriverConf" -> "simpleDriverConfValue",
237+
"driverconfwithspaces" -> "driver conf with spaces value"),
238+
"driver-jvm-options.properties",
239+
"JVM options that should be set on the driver.")
240+
sparkConf.set(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
241+
"-DsimpleDriverConf=simpleDriverConfValue" +
242+
" -Ddriverconfwithspaces='driver conf with spaces value'")
243+
sparkConf.set("spark.files", driverJvmOptionsFile.getAbsolutePath)
244+
runSparkApplicationAndVerifyCompletion(
245+
JavaMainAppResource(SUBMITTER_LOCAL_MAIN_APP_RESOURCE),
246+
JAVA_OPTIONS_MAIN_CLASS,
247+
Seq(s"All expected JVM options were present on the driver and executors."),
248+
Array(driverJvmOptionsFile.getName),
249+
Seq.empty[String])
250+
}
251+
232252
test("Submit small local files without the resource staging server.") {
233253
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
234254
sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH))
@@ -360,6 +380,20 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
360380
}
361381
}
362382
}
383+
384+
private def storeJvmOptionsInTempFile(
385+
options: Map[String, String],
386+
propertiesFileName: String,
387+
comments: String): File = {
388+
val tempDir = Utils.createTempDir()
389+
val propertiesFile = new File(tempDir, propertiesFileName)
390+
val properties = new Properties()
391+
options.foreach { case (propKey, propValue) => properties.setProperty(propKey, propValue) }
392+
Utils.tryWithResource(new FileOutputStream(propertiesFile)) { os =>
393+
properties.store(os, comments)
394+
}
395+
propertiesFile
396+
}
363397
}
364398

365399
private[spark] object KubernetesSuite {
@@ -389,6 +423,8 @@ private[spark] object KubernetesSuite {
389423
".integrationtest.jobs.FileExistenceTest"
390424
val GROUP_BY_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" +
391425
".integrationtest.jobs.GroupByTest"
426+
val JAVA_OPTIONS_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" +
427+
".integrationtest.jobs.JavaOptionsTest"
392428
val TEST_EXISTENCE_FILE_CONTENTS = "contents"
393429

394430
case object ShuffleNotReadyException extends Exception

0 commit comments

Comments
 (0)