-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-24551][K8S] Add integration tests for secrets #21652
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.spark.deploy.k8s.integrationtest | ||
|
||
import io.fabric8.kubernetes.api.model.Pod | ||
|
||
import org.apache.spark.launcher.SparkLauncher | ||
|
||
private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite => | ||
|
||
import BasicTestsSuite._ | ||
import KubernetesSuite.k8sTestTag | ||
|
||
test("Run SparkPi with no resources", k8sTestTag) { | ||
runSparkPiAndVerifyCompletion() | ||
} | ||
|
||
test("Run SparkPi with a very long application name.", k8sTestTag) { | ||
sparkAppConf.set("spark.app.name", "long" * 40) | ||
runSparkPiAndVerifyCompletion() | ||
} | ||
|
||
test("Use SparkLauncher.NO_RESOURCE", k8sTestTag) { | ||
sparkAppConf.setJars(Seq(containerLocalSparkDistroExamplesJar)) | ||
runSparkPiAndVerifyCompletion( | ||
appResource = SparkLauncher.NO_RESOURCE) | ||
} | ||
|
||
test("Run SparkPi with a master URL without a scheme.", k8sTestTag) { | ||
val url = kubernetesTestComponents.kubernetesClient.getMasterUrl | ||
val k8sMasterUrl = if (url.getPort < 0) { | ||
s"k8s://${url.getHost}" | ||
} else { | ||
s"k8s://${url.getHost}:${url.getPort}" | ||
} | ||
sparkAppConf.set("spark.master", k8sMasterUrl) | ||
runSparkPiAndVerifyCompletion() | ||
} | ||
|
||
test("Run SparkPi with an argument.", k8sTestTag) { | ||
runSparkPiAndVerifyCompletion(appArgs = Array("5")) | ||
} | ||
|
||
test("Run SparkPi with custom labels, annotations, and environment variables.", k8sTestTag) { | ||
sparkAppConf | ||
.set("spark.kubernetes.driver.label.label1", "label1-value") | ||
.set("spark.kubernetes.driver.label.label2", "label2-value") | ||
.set("spark.kubernetes.driver.annotation.annotation1", "annotation1-value") | ||
.set("spark.kubernetes.driver.annotation.annotation2", "annotation2-value") | ||
.set("spark.kubernetes.driverEnv.ENV1", "VALUE1") | ||
.set("spark.kubernetes.driverEnv.ENV2", "VALUE2") | ||
.set("spark.kubernetes.executor.label.label1", "label1-value") | ||
.set("spark.kubernetes.executor.label.label2", "label2-value") | ||
.set("spark.kubernetes.executor.annotation.annotation1", "annotation1-value") | ||
.set("spark.kubernetes.executor.annotation.annotation2", "annotation2-value") | ||
.set("spark.executorEnv.ENV1", "VALUE1") | ||
.set("spark.executorEnv.ENV2", "VALUE2") | ||
|
||
runSparkPiAndVerifyCompletion( | ||
driverPodChecker = (driverPod: Pod) => { | ||
doBasicDriverPodCheck(driverPod) | ||
checkCustomSettings(driverPod) | ||
}, | ||
executorPodChecker = (executorPod: Pod) => { | ||
doBasicExecutorPodCheck(executorPod) | ||
checkCustomSettings(executorPod) | ||
}) | ||
} | ||
|
||
test("Run extraJVMOptions check on driver", k8sTestTag) { | ||
sparkAppConf | ||
.set("spark.driver.extraJavaOptions", "-Dspark.test.foo=spark.test.bar") | ||
runSparkJVMCheckAndVerifyCompletion( | ||
expectedJVMValue = Seq("(spark.test.foo,spark.test.bar)")) | ||
} | ||
|
||
test("Run SparkRemoteFileTest using a remote data file", k8sTestTag) { | ||
sparkAppConf | ||
.set("spark.files", REMOTE_PAGE_RANK_DATA_FILE) | ||
runSparkRemoteCheckAndVerifyCompletion(appArgs = Array(REMOTE_PAGE_RANK_FILE_NAME)) | ||
} | ||
} | ||
|
||
private[spark] object BasicTestsSuite { | ||
val SPARK_PAGE_RANK_MAIN_CLASS: String = "org.apache.spark.examples.SparkPageRank" | ||
val CONTAINER_LOCAL_FILE_DOWNLOAD_PATH = "/var/spark-data/spark-files" | ||
val CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE = | ||
s"$CONTAINER_LOCAL_FILE_DOWNLOAD_PATH/pagerank_data.txt" | ||
val REMOTE_PAGE_RANK_DATA_FILE = | ||
"https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt" | ||
val REMOTE_PAGE_RANK_FILE_NAME = "pagerank_data.txt" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,25 +29,25 @@ import org.scalatest.time.{Minutes, Seconds, Span} | |
import scala.collection.JavaConverters._ | ||
|
||
import org.apache.spark.SparkFunSuite | ||
import org.apache.spark.deploy.k8s.integrationtest.TestConfig._ | ||
import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory} | ||
import org.apache.spark.deploy.k8s.integrationtest.config._ | ||
import org.apache.spark.launcher.SparkLauncher | ||
|
||
private[spark] class KubernetesSuite extends SparkFunSuite | ||
with BeforeAndAfterAll with BeforeAndAfter { | ||
with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite | ||
with PythonTestsSuite { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the future if we dont like this pattern we can just create autonomous test suits and scala test will pick them up. This is a starting point for separating tests and putting them in different files. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems a bit cumbersome but is advantageous should we add a flag in our PRB for triggering just base tests, + python, +r, and for the Kerberos integration tests (which are quite sizeable) + Kerberos. I like the decision to separate tests. I think it is appropriate. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where do you think we should put the flag? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably a mvn flag i.e. —python, —r, —kerberos There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can do that in another PR right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SGTM |
||
|
||
import KubernetesSuite._ | ||
|
||
private var testBackend: IntegrationTestBackend = _ | ||
private var sparkHomeDir: Path = _ | ||
private var kubernetesTestComponents: KubernetesTestComponents = _ | ||
private var sparkAppConf: SparkAppConf = _ | ||
private var image: String = _ | ||
private var pyImage: String = _ | ||
private var containerLocalSparkDistroExamplesJar: String = _ | ||
private var appLocator: String = _ | ||
private var driverPodName: String = _ | ||
private val k8sTestTag = Tag("k8s") | ||
|
||
protected var kubernetesTestComponents: KubernetesTestComponents = _ | ||
protected var sparkAppConf: SparkAppConf = _ | ||
protected var containerLocalSparkDistroExamplesJar: String = _ | ||
protected var appLocator: String = _ | ||
|
||
override def beforeAll(): Unit = { | ||
// The scalatest-maven-plugin gives system properties that are referenced but not set null | ||
|
@@ -103,127 +103,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite | |
deleteDriverPod() | ||
} | ||
|
||
test("Run SparkPi with no resources", k8sTestTag) { | ||
runSparkPiAndVerifyCompletion() | ||
} | ||
|
||
test("Run SparkPi with a very long application name.", k8sTestTag) { | ||
sparkAppConf.set("spark.app.name", "long" * 40) | ||
runSparkPiAndVerifyCompletion() | ||
} | ||
|
||
test("Use SparkLauncher.NO_RESOURCE", k8sTestTag) { | ||
sparkAppConf.setJars(Seq(containerLocalSparkDistroExamplesJar)) | ||
runSparkPiAndVerifyCompletion( | ||
appResource = SparkLauncher.NO_RESOURCE) | ||
} | ||
|
||
test("Run SparkPi with a master URL without a scheme.", k8sTestTag) { | ||
val url = kubernetesTestComponents.kubernetesClient.getMasterUrl | ||
val k8sMasterUrl = if (url.getPort < 0) { | ||
s"k8s://${url.getHost}" | ||
} else { | ||
s"k8s://${url.getHost}:${url.getPort}" | ||
} | ||
sparkAppConf.set("spark.master", k8sMasterUrl) | ||
runSparkPiAndVerifyCompletion() | ||
} | ||
|
||
test("Run SparkPi with an argument.", k8sTestTag) { | ||
runSparkPiAndVerifyCompletion(appArgs = Array("5")) | ||
} | ||
|
||
test("Run SparkPi with custom labels, annotations, and environment variables.", k8sTestTag) { | ||
sparkAppConf | ||
.set("spark.kubernetes.driver.label.label1", "label1-value") | ||
.set("spark.kubernetes.driver.label.label2", "label2-value") | ||
.set("spark.kubernetes.driver.annotation.annotation1", "annotation1-value") | ||
.set("spark.kubernetes.driver.annotation.annotation2", "annotation2-value") | ||
.set("spark.kubernetes.driverEnv.ENV1", "VALUE1") | ||
.set("spark.kubernetes.driverEnv.ENV2", "VALUE2") | ||
.set("spark.kubernetes.executor.label.label1", "label1-value") | ||
.set("spark.kubernetes.executor.label.label2", "label2-value") | ||
.set("spark.kubernetes.executor.annotation.annotation1", "annotation1-value") | ||
.set("spark.kubernetes.executor.annotation.annotation2", "annotation2-value") | ||
.set("spark.executorEnv.ENV1", "VALUE1") | ||
.set("spark.executorEnv.ENV2", "VALUE2") | ||
|
||
runSparkPiAndVerifyCompletion( | ||
driverPodChecker = (driverPod: Pod) => { | ||
doBasicDriverPodCheck(driverPod) | ||
checkCustomSettings(driverPod) | ||
}, | ||
executorPodChecker = (executorPod: Pod) => { | ||
doBasicExecutorPodCheck(executorPod) | ||
checkCustomSettings(executorPod) | ||
}) | ||
} | ||
|
||
test("Run extraJVMOptions check on driver", k8sTestTag) { | ||
sparkAppConf | ||
.set("spark.driver.extraJavaOptions", "-Dspark.test.foo=spark.test.bar") | ||
runSparkJVMCheckAndVerifyCompletion( | ||
expectedJVMValue = Seq("(spark.test.foo,spark.test.bar)")) | ||
} | ||
|
||
test("Run SparkRemoteFileTest using a remote data file", k8sTestTag) { | ||
sparkAppConf | ||
.set("spark.files", REMOTE_PAGE_RANK_DATA_FILE) | ||
runSparkRemoteCheckAndVerifyCompletion( | ||
appArgs = Array(REMOTE_PAGE_RANK_FILE_NAME)) | ||
} | ||
|
||
test("Run PySpark on simple pi.py example", k8sTestTag) { | ||
sparkAppConf | ||
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}") | ||
runSparkApplicationAndVerifyCompletion( | ||
appResource = PYSPARK_PI, | ||
mainClass = "", | ||
expectedLogOnCompletion = Seq("Pi is roughly 3"), | ||
appArgs = Array("5"), | ||
driverPodChecker = doBasicDriverPyPodCheck, | ||
executorPodChecker = doBasicExecutorPyPodCheck, | ||
appLocator = appLocator, | ||
isJVM = false) | ||
} | ||
|
||
test("Run PySpark with Python2 to test a pyfiles example", k8sTestTag) { | ||
sparkAppConf | ||
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}") | ||
.set("spark.kubernetes.pyspark.pythonversion", "2") | ||
runSparkApplicationAndVerifyCompletion( | ||
appResource = PYSPARK_FILES, | ||
mainClass = "", | ||
expectedLogOnCompletion = Seq( | ||
"Python runtime version check is: True", | ||
"Python environment version check is: True"), | ||
appArgs = Array("python"), | ||
driverPodChecker = doBasicDriverPyPodCheck, | ||
executorPodChecker = doBasicExecutorPyPodCheck, | ||
appLocator = appLocator, | ||
isJVM = false, | ||
pyFiles = Some(PYSPARK_CONTAINER_TESTS)) | ||
} | ||
|
||
test("Run PySpark with Python3 to test a pyfiles example", k8sTestTag) { | ||
sparkAppConf | ||
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}") | ||
.set("spark.kubernetes.pyspark.pythonversion", "3") | ||
runSparkApplicationAndVerifyCompletion( | ||
appResource = PYSPARK_FILES, | ||
mainClass = "", | ||
expectedLogOnCompletion = Seq( | ||
"Python runtime version check is: True", | ||
"Python environment version check is: True"), | ||
appArgs = Array("python3"), | ||
driverPodChecker = doBasicDriverPyPodCheck, | ||
executorPodChecker = doBasicExecutorPyPodCheck, | ||
appLocator = appLocator, | ||
isJVM = false, | ||
pyFiles = Some(PYSPARK_CONTAINER_TESTS)) | ||
} | ||
|
||
private def runSparkPiAndVerifyCompletion( | ||
protected def runSparkPiAndVerifyCompletion( | ||
appResource: String = containerLocalSparkDistroExamplesJar, | ||
driverPodChecker: Pod => Unit = doBasicDriverPodCheck, | ||
executorPodChecker: Pod => Unit = doBasicExecutorPodCheck, | ||
|
@@ -241,7 +121,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite | |
isJVM) | ||
} | ||
|
||
private def runSparkRemoteCheckAndVerifyCompletion( | ||
protected def runSparkRemoteCheckAndVerifyCompletion( | ||
appResource: String = containerLocalSparkDistroExamplesJar, | ||
driverPodChecker: Pod => Unit = doBasicDriverPodCheck, | ||
executorPodChecker: Pod => Unit = doBasicExecutorPodCheck, | ||
|
@@ -258,7 +138,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite | |
true) | ||
} | ||
|
||
private def runSparkJVMCheckAndVerifyCompletion( | ||
protected def runSparkJVMCheckAndVerifyCompletion( | ||
appResource: String = containerLocalSparkDistroExamplesJar, | ||
mainClass: String = SPARK_DRIVER_MAIN_CLASS, | ||
driverPodChecker: Pod => Unit = doBasicDriverPodCheck, | ||
|
@@ -295,7 +175,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite | |
} | ||
} | ||
|
||
private def runSparkApplicationAndVerifyCompletion( | ||
protected def runSparkApplicationAndVerifyCompletion( | ||
appResource: String, | ||
mainClass: String, | ||
expectedLogOnCompletion: Seq[String], | ||
|
@@ -347,29 +227,30 @@ private[spark] class KubernetesSuite extends SparkFunSuite | |
} | ||
} | ||
|
||
private def doBasicDriverPodCheck(driverPod: Pod): Unit = { | ||
protected def doBasicDriverPodCheck(driverPod: Pod): Unit = { | ||
assert(driverPod.getMetadata.getName === driverPodName) | ||
assert(driverPod.getSpec.getContainers.get(0).getImage === image) | ||
assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver") | ||
} | ||
|
||
private def doBasicDriverPyPodCheck(driverPod: Pod): Unit = { | ||
|
||
protected def doBasicDriverPyPodCheck(driverPod: Pod): Unit = { | ||
assert(driverPod.getMetadata.getName === driverPodName) | ||
assert(driverPod.getSpec.getContainers.get(0).getImage === pyImage) | ||
assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver") | ||
} | ||
|
||
private def doBasicExecutorPodCheck(executorPod: Pod): Unit = { | ||
protected def doBasicExecutorPodCheck(executorPod: Pod): Unit = { | ||
assert(executorPod.getSpec.getContainers.get(0).getImage === image) | ||
assert(executorPod.getSpec.getContainers.get(0).getName === "executor") | ||
} | ||
|
||
private def doBasicExecutorPyPodCheck(executorPod: Pod): Unit = { | ||
protected def doBasicExecutorPyPodCheck(executorPod: Pod): Unit = { | ||
assert(executorPod.getSpec.getContainers.get(0).getImage === pyImage) | ||
assert(executorPod.getSpec.getContainers.get(0).getName === "executor") | ||
} | ||
|
||
private def checkCustomSettings(pod: Pod): Unit = { | ||
protected def checkCustomSettings(pod: Pod): Unit = { | ||
assert(pod.getMetadata.getLabels.get("label1") === "label1-value") | ||
assert(pod.getMetadata.getLabels.get("label2") === "label2-value") | ||
assert(pod.getMetadata.getAnnotations.get("annotation1") === "annotation1-value") | ||
|
@@ -399,26 +280,10 @@ private[spark] class KubernetesSuite extends SparkFunSuite | |
} | ||
|
||
private[spark] object KubernetesSuite { | ||
|
||
val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) | ||
val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) | ||
val k8sTestTag = Tag("k8s") | ||
val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi" | ||
val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest" | ||
val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest" | ||
val SPARK_PAGE_RANK_MAIN_CLASS: String = "org.apache.spark.examples.SparkPageRank" | ||
val CONTAINER_LOCAL_PYSPARK: String = "local:///opt/spark/examples/src/main/python/" | ||
val PYSPARK_PI: String = CONTAINER_LOCAL_PYSPARK + "pi.py" | ||
val PYSPARK_FILES: String = CONTAINER_LOCAL_PYSPARK + "pyfiles.py" | ||
val PYSPARK_CONTAINER_TESTS: String = CONTAINER_LOCAL_PYSPARK + "py_container_checks.py" | ||
|
||
val TEST_SECRET_NAME_PREFIX = "test-secret-" | ||
val TEST_SECRET_KEY = "test-key" | ||
val TEST_SECRET_VALUE = "test-data" | ||
val TEST_SECRET_MOUNT_PATH = "/etc/secrets" | ||
|
||
val REMOTE_PAGE_RANK_DATA_FILE = | ||
"https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt" | ||
val REMOTE_PAGE_RANK_FILE_NAME = "pagerank_data.txt" | ||
|
||
case object ShuffleNotReadyException extends Exception | ||
val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) | ||
val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@debasishg spotted this.