Skip to content

[SPARK-25897][k8s] Hook up k8s integration tests to sbt build. #22909

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bin/docker-image-tool.sh
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ do
if ! which minikube 1>/dev/null; then
error "Cannot find minikube."
fi
if ! minikube status 1>/dev/null; then
error "Cannot contact minikube. Make sure it's running."
fi
eval $(minikube docker-env)
;;
esac
Expand Down
61 changes: 61 additions & 0 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,8 @@ object SparkBuild extends PomBuild {
// SPARK-14738 - Remove docker tests from main Spark build
// enable(DockerIntegrationTests.settings)(dockerIntegrationTests)

enable(KubernetesIntegrationTests.settings)(kubernetesIntegrationTests)

/**
* Adds the ability to run the spark shell directly from SBT without building an assembly
* jar.
Expand Down Expand Up @@ -458,6 +460,65 @@ object DockerIntegrationTests {
)
}

/**
* These settings run a hardcoded configuration of the Kubernetes integration tests using
* minikube. Docker images will have the "dev" tag, and will be overwritten every time the
* integration tests are run. The integration tests are actually bound to the "test" phase,
* so running "test" on this module will run the integration tests.
*
* There are two ways to run the tests:
* - the "tests" task builds docker images and runs the test, so it's a little slow.
* - the "run-its" task just runs the tests on a pre-built set of images.
*
* Note that this does not use the shell scripts that the maven build uses, which are more
* configurable. This is meant as a quick way for developers to run these tests against their
* local changes.
*/
object KubernetesIntegrationTests {
import BuildCommons._

val dockerBuild = TaskKey[Unit]("docker-imgs", "Build the docker images for ITs.")
val runITs = TaskKey[Unit]("run-its", "Only run ITs, skip image build.")
val imageTag = settingKey[String]("Tag to use for images built during the test.")
val namespace = settingKey[String]("Namespace where to run pods.")

// Hack: this variable is used to control whether to build docker images. It's updated by
// the tasks below in a non-obvious way, so that you get the functionality described in
// the scaladoc above.
private var shouldBuildImage = true

lazy val settings = Seq(
imageTag := "dev",
namespace := "default",
dockerBuild := {
if (shouldBuildImage) {
val dockerTool = s"$sparkHome/bin/docker-image-tool.sh"
val cmd = Seq(dockerTool, "-m", "-t", imageTag.value, "build")
val ec = Process(cmd).!
if (ec != 0) {
throw new IllegalStateException(s"Process '${cmd.mkString(" ")}' exited with $ec.")
}
}
shouldBuildImage = true
},
runITs := Def.taskDyn {
shouldBuildImage = false
Def.task {
(test in Test).value
}
}.value,
test in Test := (test in Test).dependsOn(dockerBuild).value,
javaOptions in Test ++= Seq(
"-Dspark.kubernetes.test.deployMode=minikube",
s"-Dspark.kubernetes.test.imageTag=${imageTag.value}",
s"-Dspark.kubernetes.test.namespace=${namespace.value}",
s"-Dspark.kubernetes.test.unpackSparkDir=$sparkHome"
),
// Force packaging before building images, so that the latest code is tested.
dockerBuild := dockerBuild.dependsOn(packageBin in Compile in assembly).value
)
}

/**
* Overrides to work around sbt's dependency resolution being different from Maven's.
*/
Expand Down
6 changes: 1 addition & 5 deletions resource-managers/kubernetes/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,10 @@
<executions>
<execution>
<id>test</id>
<phase>none</phase>
<goals>
<goal>test</goal>
</goals>
<configuration>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not need this anymore because the integration test is hidden behind the profile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this because I disabled the execution the proper way a few lines above.

<!-- The negative pattern below prevents integration tests such as
KubernetesSuite from running in the test phase. -->
<suffixes>(?&lt;!Suite)</suffixes>
</configuration>
</execution>
<execution>
<id>integration-test</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,34 @@ package org.apache.spark.deploy.k8s.integrationtest
import java.io.File
import java.nio.file.{Path, Paths}
import java.util.UUID
import java.util.regex.Pattern

import com.google.common.io.PatternFilenameFilter
import scala.collection.JavaConverters._

import com.google.common.base.Charsets
import com.google.common.io.Files
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Tag}
import org.scalatest.Matchers
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
import org.scalatest.time.{Minutes, Seconds, Span}
import scala.collection.JavaConverters._

import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.integrationtest.TestConfig._
import org.apache.spark.{SPARK_VERSION, SparkFunSuite}
import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory}
import org.apache.spark.internal.Logging

private[spark] class KubernetesSuite extends SparkFunSuite
class KubernetesSuite extends SparkFunSuite
with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite
with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite
with Logging with Eventually with Matchers {

import KubernetesSuite._

private var sparkHomeDir: Path = _
private var pyImage: String = _
private var rImage: String = _
protected var sparkHomeDir: Path = _
protected var pyImage: String = _
protected var rImage: String = _

protected var image: String = _
protected var testBackend: IntegrationTestBackend = _
Expand All @@ -67,6 +67,30 @@ private[spark] class KubernetesSuite extends SparkFunSuite
private val extraExecTotalMemory =
s"${(1024 + memOverheadConstant*1024 + additionalMemory).toInt}Mi"

/**
* Build the image ref for the given image name, taking the repo and tag from the
* test configuration.
*/
private def testImageRef(name: String): String = {
val tag = sys.props.get(CONFIG_KEY_IMAGE_TAG_FILE)
.map { path =>
val tagFile = new File(path)
require(tagFile.isFile,
s"No file found for image tag at ${tagFile.getAbsolutePath}.")
Files.toString(tagFile, Charsets.UTF_8).trim
}
.orElse(sys.props.get(CONFIG_KEY_IMAGE_TAG))
.getOrElse {
throw new IllegalArgumentException(
s"One of $CONFIG_KEY_IMAGE_TAG_FILE or $CONFIG_KEY_IMAGE_TAG is required.")
}
val repo = sys.props.get(CONFIG_KEY_IMAGE_REPO)
.map { _ + "/" }
.getOrElse("")

s"$repo$name:$tag"
}

override def beforeAll(): Unit = {
super.beforeAll()
// The scalatest-maven-plugin gives system properties that are referenced but not set null
Expand All @@ -83,17 +107,16 @@ private[spark] class KubernetesSuite extends SparkFunSuite
sparkHomeDir = Paths.get(sparkDirProp)
require(sparkHomeDir.toFile.isDirectory,
s"No directory found for spark home specified at $sparkHomeDir.")
val imageTag = getTestImageTag
val imageRepo = getTestImageRepo
image = s"$imageRepo/spark:$imageTag"
pyImage = s"$imageRepo/spark-py:$imageTag"
rImage = s"$imageRepo/spark-r:$imageTag"

val sparkDistroExamplesJarFile: File = sparkHomeDir.resolve(Paths.get("examples", "jars"))
.toFile
.listFiles(new PatternFilenameFilter(Pattern.compile("^spark-examples_.*\\.jar$")))(0)
containerLocalSparkDistroExamplesJar = s"local:///opt/spark/examples/jars/" +
s"${sparkDistroExamplesJarFile.getName}"
image = testImageRef("spark")
pyImage = testImageRef("spark-py")
rImage = testImageRef("spark-r")

val scalaVersion = scala.util.Properties.versionNumberString
.split("\\.")
.take(2)
.mkString(".")
containerLocalSparkDistroExamplesJar =
s"local:///opt/spark/examples/jars/spark-examples_$scalaVersion-${SPARK_VERSION}.jar"
testBackend = IntegrationTestBackendFactory.getTestBackend
testBackend.initialize()
kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ object ProcessUtils extends Logging {
* executeProcess is used to run a command and return the output if it
* completes within timeout seconds.
*/
def executeProcess(fullCommand: Array[String], timeout: Long, dumpErrors: Boolean = false): Seq[String] = {
def executeProcess(
fullCommand: Array[String],
timeout: Long,
dumpErrors: Boolean = false): Seq[String] = {
val pb = new ProcessBuilder().command(fullCommand: _*)
pb.redirectErrorStream(true)
val proc = pb.start()
Expand All @@ -41,7 +44,8 @@ object ProcessUtils extends Logging {
assert(proc.waitFor(timeout, TimeUnit.SECONDS),
s"Timed out while executing ${fullCommand.mkString(" ")}")
assert(proc.exitValue == 0,
s"Failed to execute ${fullCommand.mkString(" ")}${if (dumpErrors) "\n" + outputLines.mkString("\n")}")
s"Failed to execute ${fullCommand.mkString(" ")}" +
s"${if (dumpErrors) "\n" + outputLines.mkString("\n")}")
outputLines
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,14 @@
*/
package org.apache.spark.deploy.k8s.integrationtest

import org.apache.spark.deploy.k8s.integrationtest.TestConfig.{getTestImageRepo, getTestImageTag}

private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>

import PythonTestsSuite._
import KubernetesSuite.k8sTestTag

private val pySparkDockerImage =
s"${getTestImageRepo}/spark-py:${getTestImageTag}"
test("Run PySpark on simple pi.py example", k8sTestTag) {
sparkAppConf
.set("spark.kubernetes.container.image", pySparkDockerImage)
.set("spark.kubernetes.container.image", pyImage)
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_PI,
mainClass = "",
Expand All @@ -41,7 +37,7 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>

test("Run PySpark with Python2 to test a pyfiles example", k8sTestTag) {
sparkAppConf
.set("spark.kubernetes.container.image", pySparkDockerImage)
.set("spark.kubernetes.container.image", pyImage)
.set("spark.kubernetes.pyspark.pythonVersion", "2")
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_FILES,
Expand All @@ -59,7 +55,7 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>

test("Run PySpark with Python3 to test a pyfiles example", k8sTestTag) {
sparkAppConf
.set("spark.kubernetes.container.image", pySparkDockerImage)
.set("spark.kubernetes.container.image", pyImage)
.set("spark.kubernetes.pyspark.pythonVersion", "3")
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_FILES,
Expand All @@ -77,7 +73,7 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>

test("Run PySpark with memory customization", k8sTestTag) {
sparkAppConf
.set("spark.kubernetes.container.image", pySparkDockerImage)
.set("spark.kubernetes.container.image", pyImage)
.set("spark.kubernetes.pyspark.pythonVersion", "3")
.set("spark.kubernetes.memoryOverheadFactor", s"$memOverheadConstant")
.set("spark.executor.pyspark.memory", s"${additionalMemory}m")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@
*/
package org.apache.spark.deploy.k8s.integrationtest

import org.apache.spark.deploy.k8s.integrationtest.TestConfig.{getTestImageRepo, getTestImageTag}

private[spark] trait RTestsSuite { k8sSuite: KubernetesSuite =>

import RTestsSuite._
import KubernetesSuite.k8sTestTag

test("Run SparkR on simple dataframe.R example", k8sTestTag) {
sparkAppConf
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-r:${getTestImageTag}")
sparkAppConf.set("spark.kubernetes.container.image", rImage)
runSparkApplicationAndVerifyCompletion(
appResource = SPARK_R_DATAFRAME_TEST,
mainClass = "",
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ object TestConstants {
val CONFIG_KEY_KUBE_MASTER_URL = "spark.kubernetes.test.master"
val CONFIG_KEY_KUBE_NAMESPACE = "spark.kubernetes.test.namespace"
val CONFIG_KEY_KUBE_SVC_ACCOUNT = "spark.kubernetes.test.serviceAccountName"
val CONFIG_KEY_IMAGE_TAG = "spark.kubernetes.test.imageTagF"
val CONFIG_KEY_IMAGE_TAG = "spark.kubernetes.test.imageTag"
val CONFIG_KEY_IMAGE_TAG_FILE = "spark.kubernetes.test.imageTagFile"
val CONFIG_KEY_IMAGE_REPO = "spark.kubernetes.test.imageRepo"
val CONFIG_KEY_UNPACK_DIR = "spark.kubernetes.test.unpackSparkDir"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.deploy.k8s.integrationtest.backend

import io.fabric8.kubernetes.client.DefaultKubernetesClient

import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
import org.apache.spark.deploy.k8s.integrationtest.backend.cloud.KubeConfigBackend
import org.apache.spark.deploy.k8s.integrationtest.backend.docker.DockerForDesktopBackend
Expand All @@ -35,7 +36,8 @@ private[spark] object IntegrationTestBackendFactory {
.getOrElse(BACKEND_MINIKUBE)
deployMode match {
case BACKEND_MINIKUBE => MinikubeTestBackend
case BACKEND_CLOUD => new KubeConfigBackend(System.getProperty(CONFIG_KEY_KUBE_CONFIG_CONTEXT))
case BACKEND_CLOUD =>
new KubeConfigBackend(System.getProperty(CONFIG_KEY_KUBE_CONFIG_CONTEXT))
case BACKEND_DOCKER_FOR_DESKTOP => DockerForDesktopBackend
case _ => throw new IllegalArgumentException("Invalid " +
CONFIG_KEY_DEPLOY_MODE + ": " + deployMode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ package org.apache.spark.deploy.k8s.integrationtest.backend.cloud

import java.nio.file.Paths

import io.fabric8.kubernetes.client.utils.Utils
import io.fabric8.kubernetes.client.{Config, DefaultKubernetesClient}
import io.fabric8.kubernetes.client.utils.Utils
import org.apache.commons.lang3.StringUtils

import org.apache.spark.deploy.k8s.integrationtest.TestConstants
import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend
import org.apache.spark.internal.Logging
Expand All @@ -38,7 +39,7 @@ private[spark] class KubeConfigBackend(var context: String)
// Auto-configure K8S client from K8S config file
if (Utils.getSystemPropertyOrEnvVar(Config.KUBERNETES_KUBECONFIG_FILE, null: String) == null) {
// Fabric 8 client will automatically assume a default location in this case
logWarning(s"No explicit KUBECONFIG specified, will assume .kube/config under your home directory")
logWarning("No explicit KUBECONFIG specified, will assume $HOME/.kube/config")
}
val config = Config.autoConfigure(context)

Expand Down