From f206bbde3a8f64650236013d61680faba492d7a4 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 25 Mar 2020 12:38:15 -0700 Subject: [PATCH] [SPARK-31244][K8S][TEST] Use Minio instead of Ceph in K8S DepsTestsSuite ### What changes were proposed in this pull request? This PR (SPARK-31244) replaces `Ceph` with `Minio` in K8S `DepsTestSuite`. ### Why are the changes needed? Currently, `DepsTestsSuite` is using `ceph` for S3 storage. However, the used version and all new releases are broken on new `minikube` releases. We had better use more robust and small one. ``` $ minikube version minikube version: v1.8.2 $ minikube -p minikube docker-env | source $ docker run -it --rm -e NETWORK_AUTO_DETECT=4 -e RGW_FRONTEND_PORT=8000 -e SREE_PORT=5001 -e CEPH_DEMO_UID=nano -e CEPH_DAEMON=demo ceph/daemon:v4.0.3-stable-4.0-nautilus-centos-7-x86_64 /bin/sh 2020-03-25 04:26:21 /opt/ceph-container/bin/entrypoint.sh: ERROR- it looks like we have not been able to discover the network settings $ docker run -it --rm -e NETWORK_AUTO_DETECT=4 -e RGW_FRONTEND_PORT=8000 -e SREE_PORT=5001 -e CEPH_DEMO_UID=nano -e CEPH_DAEMON=demo ceph/daemon:v4.0.11-stable-4.0-nautilus-centos-7 /bin/sh 2020-03-25 04:20:30 /opt/ceph-container/bin/entrypoint.sh: ERROR- it looks like we have not been able to discover the network settings ``` Also, the image size is unnecessarily big (almost `1GB`) and growing while `minio` is `55.8MB` with the same features. ``` $ docker images | grep ceph ceph/daemon v4.0.3-stable-4.0-nautilus-centos-7-x86_64 a6a05ccdf924 6 months ago 852MB ceph/daemon v4.0.11-stable-4.0-nautilus-centos-7 87f695550d8e 12 hours ago 901MB $ docker images | grep minio minio/minio latest 95c226551ea6 5 days ago 55.8MB ``` ### Does this PR introduce any user-facing change? No. (This is a test case change) ### How was this patch tested? Pass the existing Jenkins K8s integration test job and test with the latest minikube. ``` $ minikube version minikube version: v1.8.2 $ kubectl version --short Client Version: v1.17.4 Server Version: v1.17.4 $ NO_MANUAL=1 ./dev/make-distribution.sh --r --pip --tgz -Pkubernetes $ resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh --spark-tgz $PWD/spark-*.tgz ... KubernetesSuite: - Run SparkPi with no resources - Run SparkPi with a very long application name. - Use SparkLauncher.NO_RESOURCE - Run SparkPi with a master URL without a scheme. - Run SparkPi with an argument. - Run SparkPi with custom labels, annotations, and environment variables. - All pods have the same service account by default - Run extraJVMOptions check on driver - Run SparkRemoteFileTest using a remote data file - Run SparkPi with env and mount secrets. - Run PySpark on simple pi.py example - Run PySpark with Python2 to test a pyfiles example - Run PySpark with Python3 to test a pyfiles example - Run PySpark with memory customization - Run in client mode. - Start pod creation from template - PVs with local storage *** FAILED *** // This is irrelevant to this PR. - Launcher client dependencies // This is the fixed test case by this PR. - Test basic decommissioning - Run SparkR on simple dataframe.R example Run completed in 12 minutes, 4 seconds. ... ``` The following is the working snapshot of `DepsTestSuite` test. ``` $ kubectl get all -ncf9438dd8a65436686b1196a6b73000f NAME READY STATUS RESTARTS AGE pod/minio-0 1/1 Running 0 70s pod/spark-test-app-8494bddca3754390b9e59a2ef47584eb 1/1 Running 0 55s NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/minio-s3 NodePort 10.109.54.180 9000:30678/TCP 70s service/spark-test-app-fd916b711061c7b8-driver-svc ClusterIP None 7078/TCP,7079/TCP,4040/TCP 55s NAME READY AGE statefulset.apps/minio 1/1 70s ``` Closes #28015 from dongjoon-hyun/SPARK-31244. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .../k8s/integrationtest/DepsTestsSuite.scala | 90 ++++++++----------- 1 file changed, 36 insertions(+), 54 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala index 367cff62cd493..2d90c06e36390 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala @@ -35,17 +35,16 @@ import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.Minikube private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => import KubernetesSuite.k8sTestTag - val cName = "ceph-nano" + val cName = "minio" val svcName = s"$cName-s3" - val bucket = "spark" - - private def getCephContainer(): Container = { - val envVars = Map ( "NETWORK_AUTO_DETECT" -> "4", - "RGW_FRONTEND_PORT" -> "8000", - "SREE_PORT" -> "5001", - "CEPH_DEMO_UID" -> "nano", - "CEPH_DAEMON" -> "demo", - "DEBUG" -> "verbose" + val BUCKET = "spark" + val ACCESS_KEY = "minio" + val SECRET_KEY = "miniostorage" + + private def getMinioContainer(): Container = { + val envVars = Map ( + "MINIO_ACCESS_KEY" -> ACCESS_KEY, + "MINIO_SECRET_KEY" -> SECRET_KEY ).map( envV => new EnvVarBuilder() .withName(envV._1) @@ -59,13 +58,14 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => ).asJava new ContainerBuilder() - .withImage("ceph/daemon:v4.0.3-stable-4.0-nautilus-centos-7-x86_64") + .withImage("minio/minio:latest") .withImagePullPolicy("Always") .withName(cName) + .withArgs("server", "/data") .withPorts(new ContainerPortBuilder() .withName(svcName) .withProtocol("TCP") - .withContainerPort(8000) + .withContainerPort(9000) .build() ) .withResources(new ResourceRequirementsBuilder() @@ -77,10 +77,9 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => .build() } - // Based on https://github.com/ceph/cn - private def setupCephStorage(): Unit = { - val labels = Map("app" -> "ceph", "daemon" -> "nano").asJava - val cephService = new ServiceBuilder() + private def setupMinioStorage(): Unit = { + val labels = Map("app" -> "minio").asJava + val minioService = new ServiceBuilder() .withNewMetadata() .withName(svcName) .withLabels(labels) @@ -88,9 +87,9 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => .withNewSpec() .withPorts(new ServicePortBuilder() .withName("https") - .withPort(8000) + .withPort(9000) .withProtocol("TCP") - .withTargetPort(new IntOrString(8000)) + .withTargetPort(new IntOrString(9000)) .build() ) .withType("NodePort") @@ -98,7 +97,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => .endSpec() .build() - val cephStatefulSet = new StatefulSetBuilder() + val minioStatefulSet = new StatefulSetBuilder() .withNewMetadata() .withName(cName) .withLabels(labels) @@ -106,7 +105,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => .withNewSpec() .withReplicas(1) .withNewSelector() - .withMatchLabels(Map("app" -> "ceph").asJava) + .withMatchLabels(Map("app" -> "minio").asJava) .endSelector() .withServiceName(cName) .withNewTemplate() @@ -115,7 +114,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => .withLabels(labels) .endMetadata() .withNewSpec() - .withContainers(getCephContainer()) + .withContainers(getMinioContainer()) .endSpec() .endTemplate() .endSpec() @@ -124,16 +123,16 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => kubernetesTestComponents .kubernetesClient .services() - .create(cephService) + .create(minioService) kubernetesTestComponents .kubernetesClient .apps() .statefulSets() - .create(cephStatefulSet) + .create(minioStatefulSet) } - private def deleteCephStorage(): Unit = { + private def deleteMinioStorage(): Unit = { kubernetesTestComponents .kubernetesClient .apps() @@ -151,47 +150,30 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => test("Launcher client dependencies", k8sTestTag, MinikubeTag) { val fileName = Utils.createTempFile(FILE_CONTENTS, HOST_PATH) try { - setupCephStorage() - val cephUrlStr = getServiceUrl(svcName) - val cephUrl = new URL(cephUrlStr) - val cephHost = cephUrl.getHost - val cephPort = cephUrl.getPort + setupMinioStorage() + val minioUrlStr = getServiceUrl(svcName) + val minioUrl = new URL(minioUrlStr) + val minioHost = minioUrl.getHost + val minioPort = minioUrl.getPort val examplesJar = Utils.getExamplesJarAbsolutePath(sparkHomeDir) - val (accessKey, secretKey) = getCephCredentials() sparkAppConf - .set("spark.hadoop.fs.s3a.access.key", accessKey) - .set("spark.hadoop.fs.s3a.secret.key", secretKey) + .set("spark.hadoop.fs.s3a.access.key", ACCESS_KEY) + .set("spark.hadoop.fs.s3a.secret.key", SECRET_KEY) .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") - .set("spark.hadoop.fs.s3a.endpoint", s"$cephHost:$cephPort") - .set("spark.kubernetes.file.upload.path", s"s3a://$bucket") + .set("spark.hadoop.fs.s3a.endpoint", s"$minioHost:$minioPort") + .set("spark.kubernetes.file.upload.path", s"s3a://$BUCKET") .set("spark.files", s"$HOST_PATH/$fileName") .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") .set("spark.jars.packages", "com.amazonaws:aws-java-sdk:" + "1.7.4,org.apache.hadoop:hadoop-aws:2.7.6") .set("spark.driver.extraJavaOptions", "-Divy.cache.dir=/tmp -Divy.home=/tmp") - createS3Bucket(accessKey, secretKey, cephUrlStr) + createS3Bucket(ACCESS_KEY, SECRET_KEY, minioUrlStr) runSparkRemoteCheckAndVerifyCompletion(appResource = examplesJar, appArgs = Array(fileName), timeout = Option(DEPS_TIMEOUT)) } finally { // make sure this always runs - deleteCephStorage() - } - } - - // There isn't a cleaner way to get the credentials - // when ceph-nano runs on k8s - private def getCephCredentials(): (String, String) = { - Eventually.eventually(TIMEOUT, INTERVAL) { - val cephPod = kubernetesTestComponents - .kubernetesClient - .pods() - .withName(s"$cName-0") - .get() - implicit val podName: String = cephPod.getMetadata.getName - implicit val components: KubernetesTestComponents = kubernetesTestComponents - val contents = Utils.executeCommand("cat", "/nano_user_details") - (extractS3Key(contents, "access_key"), extractS3Key(contents, "secret_key")) + deleteMinioStorage() } } @@ -211,10 +193,10 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => val credentials = new BasicAWSCredentials(accessKey, secretKey) val s3client = new AmazonS3Client(credentials) s3client.setEndpoint(endPoint) - s3client.createBucket(bucket) + s3client.createBucket(BUCKET) } catch { case e: Exception => - throw new SparkException(s"Failed to create bucket $bucket.", e) + throw new SparkException(s"Failed to create bucket $BUCKET.", e) } } }