Skip to content

[SPARK-24551][K8S] Add integration tests for secrets #21652

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/docker-image-tool.sh
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ BASEDOCKERFILE=
PYDOCKERFILE=
NOCACHEARG=
BUILD_PARAMS=
while getopts f:mr:t:n:b: option
while getopts f:p:mr:t:n:b: option
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@debasishg spotted this.

do
case "${option}"
in
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.integrationtest

import io.fabric8.kubernetes.api.model.Pod

import org.apache.spark.launcher.SparkLauncher

private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite =>

import BasicTestsSuite._
import KubernetesSuite.k8sTestTag

test("Run SparkPi with no resources", k8sTestTag) {
runSparkPiAndVerifyCompletion()
}

test("Run SparkPi with a very long application name.", k8sTestTag) {
sparkAppConf.set("spark.app.name", "long" * 40)
runSparkPiAndVerifyCompletion()
}

test("Use SparkLauncher.NO_RESOURCE", k8sTestTag) {
sparkAppConf.setJars(Seq(containerLocalSparkDistroExamplesJar))
runSparkPiAndVerifyCompletion(
appResource = SparkLauncher.NO_RESOURCE)
}

test("Run SparkPi with a master URL without a scheme.", k8sTestTag) {
val url = kubernetesTestComponents.kubernetesClient.getMasterUrl
val k8sMasterUrl = if (url.getPort < 0) {
s"k8s://${url.getHost}"
} else {
s"k8s://${url.getHost}:${url.getPort}"
}
sparkAppConf.set("spark.master", k8sMasterUrl)
runSparkPiAndVerifyCompletion()
}

test("Run SparkPi with an argument.", k8sTestTag) {
runSparkPiAndVerifyCompletion(appArgs = Array("5"))
}

test("Run SparkPi with custom labels, annotations, and environment variables.", k8sTestTag) {
sparkAppConf
.set("spark.kubernetes.driver.label.label1", "label1-value")
.set("spark.kubernetes.driver.label.label2", "label2-value")
.set("spark.kubernetes.driver.annotation.annotation1", "annotation1-value")
.set("spark.kubernetes.driver.annotation.annotation2", "annotation2-value")
.set("spark.kubernetes.driverEnv.ENV1", "VALUE1")
.set("spark.kubernetes.driverEnv.ENV2", "VALUE2")
.set("spark.kubernetes.executor.label.label1", "label1-value")
.set("spark.kubernetes.executor.label.label2", "label2-value")
.set("spark.kubernetes.executor.annotation.annotation1", "annotation1-value")
.set("spark.kubernetes.executor.annotation.annotation2", "annotation2-value")
.set("spark.executorEnv.ENV1", "VALUE1")
.set("spark.executorEnv.ENV2", "VALUE2")

runSparkPiAndVerifyCompletion(
driverPodChecker = (driverPod: Pod) => {
doBasicDriverPodCheck(driverPod)
checkCustomSettings(driverPod)
},
executorPodChecker = (executorPod: Pod) => {
doBasicExecutorPodCheck(executorPod)
checkCustomSettings(executorPod)
})
}

test("Run extraJVMOptions check on driver", k8sTestTag) {
sparkAppConf
.set("spark.driver.extraJavaOptions", "-Dspark.test.foo=spark.test.bar")
runSparkJVMCheckAndVerifyCompletion(
expectedJVMValue = Seq("(spark.test.foo,spark.test.bar)"))
}

test("Run SparkRemoteFileTest using a remote data file", k8sTestTag) {
sparkAppConf
.set("spark.files", REMOTE_PAGE_RANK_DATA_FILE)
runSparkRemoteCheckAndVerifyCompletion(appArgs = Array(REMOTE_PAGE_RANK_FILE_NAME))
}
}

private[spark] object BasicTestsSuite {
val SPARK_PAGE_RANK_MAIN_CLASS: String = "org.apache.spark.examples.SparkPageRank"
val CONTAINER_LOCAL_FILE_DOWNLOAD_PATH = "/var/spark-data/spark-files"
val CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE =
s"$CONTAINER_LOCAL_FILE_DOWNLOAD_PATH/pagerank_data.txt"
val REMOTE_PAGE_RANK_DATA_FILE =
"https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt"
val REMOTE_PAGE_RANK_FILE_NAME = "pagerank_data.txt"
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,25 @@ import org.scalatest.time.{Minutes, Seconds, Span}
import scala.collection.JavaConverters._

import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.integrationtest.TestConfig._
import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory}
import org.apache.spark.deploy.k8s.integrationtest.config._
import org.apache.spark.launcher.SparkLauncher

private[spark] class KubernetesSuite extends SparkFunSuite
with BeforeAndAfterAll with BeforeAndAfter {
with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite
with PythonTestsSuite {
Copy link
Contributor Author

@skonto skonto Jul 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future if we dont like this pattern we can just create autonomous test suits and scala test will pick them up. This is a starting point for separating tests and putting them in different files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a bit cumbersome but is advantageous should we add a flag in our PRB for triggering just base tests, + python, +r, and for the Kerberos integration tests (which are quite sizeable) + Kerberos. I like the decision to separate tests. I think it is appropriate.

Copy link
Contributor Author

@skonto skonto Jul 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do you think we should put the flag?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a mvn flag i.e. —python, —r, —kerberos

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do that in another PR right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM


import KubernetesSuite._

private var testBackend: IntegrationTestBackend = _
private var sparkHomeDir: Path = _
private var kubernetesTestComponents: KubernetesTestComponents = _
private var sparkAppConf: SparkAppConf = _
private var image: String = _
private var pyImage: String = _
private var containerLocalSparkDistroExamplesJar: String = _
private var appLocator: String = _
private var driverPodName: String = _
private val k8sTestTag = Tag("k8s")

protected var kubernetesTestComponents: KubernetesTestComponents = _
protected var sparkAppConf: SparkAppConf = _
protected var containerLocalSparkDistroExamplesJar: String = _
protected var appLocator: String = _

override def beforeAll(): Unit = {
// The scalatest-maven-plugin gives system properties that are referenced but not set null
Expand Down Expand Up @@ -103,127 +103,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite
deleteDriverPod()
}

test("Run SparkPi with no resources", k8sTestTag) {
runSparkPiAndVerifyCompletion()
}

test("Run SparkPi with a very long application name.", k8sTestTag) {
sparkAppConf.set("spark.app.name", "long" * 40)
runSparkPiAndVerifyCompletion()
}

test("Use SparkLauncher.NO_RESOURCE", k8sTestTag) {
sparkAppConf.setJars(Seq(containerLocalSparkDistroExamplesJar))
runSparkPiAndVerifyCompletion(
appResource = SparkLauncher.NO_RESOURCE)
}

test("Run SparkPi with a master URL without a scheme.", k8sTestTag) {
val url = kubernetesTestComponents.kubernetesClient.getMasterUrl
val k8sMasterUrl = if (url.getPort < 0) {
s"k8s://${url.getHost}"
} else {
s"k8s://${url.getHost}:${url.getPort}"
}
sparkAppConf.set("spark.master", k8sMasterUrl)
runSparkPiAndVerifyCompletion()
}

test("Run SparkPi with an argument.", k8sTestTag) {
runSparkPiAndVerifyCompletion(appArgs = Array("5"))
}

test("Run SparkPi with custom labels, annotations, and environment variables.", k8sTestTag) {
sparkAppConf
.set("spark.kubernetes.driver.label.label1", "label1-value")
.set("spark.kubernetes.driver.label.label2", "label2-value")
.set("spark.kubernetes.driver.annotation.annotation1", "annotation1-value")
.set("spark.kubernetes.driver.annotation.annotation2", "annotation2-value")
.set("spark.kubernetes.driverEnv.ENV1", "VALUE1")
.set("spark.kubernetes.driverEnv.ENV2", "VALUE2")
.set("spark.kubernetes.executor.label.label1", "label1-value")
.set("spark.kubernetes.executor.label.label2", "label2-value")
.set("spark.kubernetes.executor.annotation.annotation1", "annotation1-value")
.set("spark.kubernetes.executor.annotation.annotation2", "annotation2-value")
.set("spark.executorEnv.ENV1", "VALUE1")
.set("spark.executorEnv.ENV2", "VALUE2")

runSparkPiAndVerifyCompletion(
driverPodChecker = (driverPod: Pod) => {
doBasicDriverPodCheck(driverPod)
checkCustomSettings(driverPod)
},
executorPodChecker = (executorPod: Pod) => {
doBasicExecutorPodCheck(executorPod)
checkCustomSettings(executorPod)
})
}

test("Run extraJVMOptions check on driver", k8sTestTag) {
sparkAppConf
.set("spark.driver.extraJavaOptions", "-Dspark.test.foo=spark.test.bar")
runSparkJVMCheckAndVerifyCompletion(
expectedJVMValue = Seq("(spark.test.foo,spark.test.bar)"))
}

test("Run SparkRemoteFileTest using a remote data file", k8sTestTag) {
sparkAppConf
.set("spark.files", REMOTE_PAGE_RANK_DATA_FILE)
runSparkRemoteCheckAndVerifyCompletion(
appArgs = Array(REMOTE_PAGE_RANK_FILE_NAME))
}

test("Run PySpark on simple pi.py example", k8sTestTag) {
sparkAppConf
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_PI,
mainClass = "",
expectedLogOnCompletion = Seq("Pi is roughly 3"),
appArgs = Array("5"),
driverPodChecker = doBasicDriverPyPodCheck,
executorPodChecker = doBasicExecutorPyPodCheck,
appLocator = appLocator,
isJVM = false)
}

test("Run PySpark with Python2 to test a pyfiles example", k8sTestTag) {
sparkAppConf
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
.set("spark.kubernetes.pyspark.pythonversion", "2")
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_FILES,
mainClass = "",
expectedLogOnCompletion = Seq(
"Python runtime version check is: True",
"Python environment version check is: True"),
appArgs = Array("python"),
driverPodChecker = doBasicDriverPyPodCheck,
executorPodChecker = doBasicExecutorPyPodCheck,
appLocator = appLocator,
isJVM = false,
pyFiles = Some(PYSPARK_CONTAINER_TESTS))
}

test("Run PySpark with Python3 to test a pyfiles example", k8sTestTag) {
sparkAppConf
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
.set("spark.kubernetes.pyspark.pythonversion", "3")
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_FILES,
mainClass = "",
expectedLogOnCompletion = Seq(
"Python runtime version check is: True",
"Python environment version check is: True"),
appArgs = Array("python3"),
driverPodChecker = doBasicDriverPyPodCheck,
executorPodChecker = doBasicExecutorPyPodCheck,
appLocator = appLocator,
isJVM = false,
pyFiles = Some(PYSPARK_CONTAINER_TESTS))
}

private def runSparkPiAndVerifyCompletion(
protected def runSparkPiAndVerifyCompletion(
appResource: String = containerLocalSparkDistroExamplesJar,
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
Expand All @@ -241,7 +121,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite
isJVM)
}

private def runSparkRemoteCheckAndVerifyCompletion(
protected def runSparkRemoteCheckAndVerifyCompletion(
appResource: String = containerLocalSparkDistroExamplesJar,
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
Expand All @@ -258,7 +138,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite
true)
}

private def runSparkJVMCheckAndVerifyCompletion(
protected def runSparkJVMCheckAndVerifyCompletion(
appResource: String = containerLocalSparkDistroExamplesJar,
mainClass: String = SPARK_DRIVER_MAIN_CLASS,
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
Expand Down Expand Up @@ -295,7 +175,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite
}
}

private def runSparkApplicationAndVerifyCompletion(
protected def runSparkApplicationAndVerifyCompletion(
appResource: String,
mainClass: String,
expectedLogOnCompletion: Seq[String],
Expand Down Expand Up @@ -347,29 +227,30 @@ private[spark] class KubernetesSuite extends SparkFunSuite
}
}

private def doBasicDriverPodCheck(driverPod: Pod): Unit = {
protected def doBasicDriverPodCheck(driverPod: Pod): Unit = {
assert(driverPod.getMetadata.getName === driverPodName)
assert(driverPod.getSpec.getContainers.get(0).getImage === image)
assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver")
}

private def doBasicDriverPyPodCheck(driverPod: Pod): Unit = {

protected def doBasicDriverPyPodCheck(driverPod: Pod): Unit = {
assert(driverPod.getMetadata.getName === driverPodName)
assert(driverPod.getSpec.getContainers.get(0).getImage === pyImage)
assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver")
}

private def doBasicExecutorPodCheck(executorPod: Pod): Unit = {
protected def doBasicExecutorPodCheck(executorPod: Pod): Unit = {
assert(executorPod.getSpec.getContainers.get(0).getImage === image)
assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
}

private def doBasicExecutorPyPodCheck(executorPod: Pod): Unit = {
protected def doBasicExecutorPyPodCheck(executorPod: Pod): Unit = {
assert(executorPod.getSpec.getContainers.get(0).getImage === pyImage)
assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
}

private def checkCustomSettings(pod: Pod): Unit = {
protected def checkCustomSettings(pod: Pod): Unit = {
assert(pod.getMetadata.getLabels.get("label1") === "label1-value")
assert(pod.getMetadata.getLabels.get("label2") === "label2-value")
assert(pod.getMetadata.getAnnotations.get("annotation1") === "annotation1-value")
Expand Down Expand Up @@ -399,26 +280,10 @@ private[spark] class KubernetesSuite extends SparkFunSuite
}

private[spark] object KubernetesSuite {

val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
val k8sTestTag = Tag("k8s")
val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi"
val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest"
val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest"
val SPARK_PAGE_RANK_MAIN_CLASS: String = "org.apache.spark.examples.SparkPageRank"
val CONTAINER_LOCAL_PYSPARK: String = "local:///opt/spark/examples/src/main/python/"
val PYSPARK_PI: String = CONTAINER_LOCAL_PYSPARK + "pi.py"
val PYSPARK_FILES: String = CONTAINER_LOCAL_PYSPARK + "pyfiles.py"
val PYSPARK_CONTAINER_TESTS: String = CONTAINER_LOCAL_PYSPARK + "py_container_checks.py"

val TEST_SECRET_NAME_PREFIX = "test-secret-"
val TEST_SECRET_KEY = "test-key"
val TEST_SECRET_VALUE = "test-data"
val TEST_SECRET_MOUNT_PATH = "/etc/secrets"

val REMOTE_PAGE_RANK_DATA_FILE =
"https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt"
val REMOTE_PAGE_RANK_FILE_NAME = "pagerank_data.txt"

case object ShuffleNotReadyException extends Exception
val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
}
Loading