Skip to content

Commit

Permalink
[SPARK-31244][K8S][TEST] Use Minio instead of Ceph in K8S DepsTestsSuite
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR (SPARK-31244) replaces `Ceph` with `Minio` in K8S `DepsTestSuite`.

### Why are the changes needed?

Currently, `DepsTestsSuite` is using `ceph` for S3 storage. However, the used version and all new releases are broken on new `minikube` releases. We had better use more robust and small one.

```
$ minikube version
minikube version: v1.8.2

$ minikube -p minikube docker-env | source

$ docker run -it --rm -e NETWORK_AUTO_DETECT=4 -e RGW_FRONTEND_PORT=8000 -e SREE_PORT=5001 -e CEPH_DEMO_UID=nano -e CEPH_DAEMON=demo ceph/daemon:v4.0.3-stable-4.0-nautilus-centos-7-x86_64 /bin/sh
2020-03-25 04:26:21  /opt/ceph-container/bin/entrypoint.sh: ERROR- it looks like we have not been able to discover the network settings

$ docker run -it --rm -e NETWORK_AUTO_DETECT=4 -e RGW_FRONTEND_PORT=8000 -e SREE_PORT=5001 -e CEPH_DEMO_UID=nano -e CEPH_DAEMON=demo ceph/daemon:v4.0.11-stable-4.0-nautilus-centos-7 /bin/sh
2020-03-25 04:20:30  /opt/ceph-container/bin/entrypoint.sh: ERROR- it looks like we have not been able to discover the network settings
```

Also, the image size is unnecessarily big (almost `1GB`) and growing while `minio` is `55.8MB` with the same features.
```
$ docker images | grep ceph
ceph/daemon v4.0.3-stable-4.0-nautilus-centos-7-x86_64 a6a05ccdf924 6 months ago 852MB
ceph/daemon v4.0.11-stable-4.0-nautilus-centos-7       87f695550d8e 12 hours ago 901MB

$ docker images | grep minio
minio/minio latest                                     95c226551ea6 5 days ago   55.8MB
```

### Does this PR introduce any user-facing change?

No. (This is a test case change)

### How was this patch tested?

Pass the existing Jenkins K8s integration test job and test with the latest minikube.
```
$ minikube version
minikube version: v1.8.2

$ kubectl version --short
Client Version: v1.17.4
Server Version: v1.17.4

$ NO_MANUAL=1 ./dev/make-distribution.sh --r --pip --tgz -Pkubernetes
$ resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh --spark-tgz $PWD/spark-*.tgz
...
KubernetesSuite:
- Run SparkPi with no resources
- Run SparkPi with a very long application name.
- Use SparkLauncher.NO_RESOURCE
- Run SparkPi with a master URL without a scheme.
- Run SparkPi with an argument.
- Run SparkPi with custom labels, annotations, and environment variables.
- All pods have the same service account by default
- Run extraJVMOptions check on driver
- Run SparkRemoteFileTest using a remote data file
- Run SparkPi with env and mount secrets.
- Run PySpark on simple pi.py example
- Run PySpark with Python2 to test a pyfiles example
- Run PySpark with Python3 to test a pyfiles example
- Run PySpark with memory customization
- Run in client mode.
- Start pod creation from template
- PVs with local storage *** FAILED *** // This is irrelevant to this PR.
- Launcher client dependencies          // This is the fixed test case by this PR.
- Test basic decommissioning
- Run SparkR on simple dataframe.R example
Run completed in 12 minutes, 4 seconds.
...
```

The following is the working snapshot of `DepsTestSuite` test.
```
$ kubectl get all -ncf9438dd8a65436686b1196a6b73000f
NAME                                                  READY   STATUS    RESTARTS   AGE
pod/minio-0                                           1/1     Running   0          70s
pod/spark-test-app-8494bddca3754390b9e59a2ef47584eb   1/1     Running   0          55s

NAME                                                 TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                      AGE
service/minio-s3                                     NodePort    10.109.54.180   <none>        9000:30678/TCP               70s
service/spark-test-app-fd916b711061c7b8-driver-svc   ClusterIP   None            <none>        7078/TCP,7079/TCP,4040/TCP   55s

NAME                     READY   AGE
statefulset.apps/minio   1/1     70s
```

Closes #28015 from dongjoon-hyun/SPARK-31244.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
dongjoon-hyun committed Mar 25, 2020
1 parent 4f274a4 commit f206bbd
Showing 1 changed file with 36 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,16 @@ import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.Minikube
private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
import KubernetesSuite.k8sTestTag

val cName = "ceph-nano"
val cName = "minio"
val svcName = s"$cName-s3"
val bucket = "spark"

private def getCephContainer(): Container = {
val envVars = Map ( "NETWORK_AUTO_DETECT" -> "4",
"RGW_FRONTEND_PORT" -> "8000",
"SREE_PORT" -> "5001",
"CEPH_DEMO_UID" -> "nano",
"CEPH_DAEMON" -> "demo",
"DEBUG" -> "verbose"
val BUCKET = "spark"
val ACCESS_KEY = "minio"
val SECRET_KEY = "miniostorage"

private def getMinioContainer(): Container = {
val envVars = Map (
"MINIO_ACCESS_KEY" -> ACCESS_KEY,
"MINIO_SECRET_KEY" -> SECRET_KEY
).map( envV =>
new EnvVarBuilder()
.withName(envV._1)
Expand All @@ -59,13 +58,14 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
).asJava

new ContainerBuilder()
.withImage("ceph/daemon:v4.0.3-stable-4.0-nautilus-centos-7-x86_64")
.withImage("minio/minio:latest")
.withImagePullPolicy("Always")
.withName(cName)
.withArgs("server", "/data")
.withPorts(new ContainerPortBuilder()
.withName(svcName)
.withProtocol("TCP")
.withContainerPort(8000)
.withContainerPort(9000)
.build()
)
.withResources(new ResourceRequirementsBuilder()
Expand All @@ -77,36 +77,35 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
.build()
}

// Based on https://github.com/ceph/cn
private def setupCephStorage(): Unit = {
val labels = Map("app" -> "ceph", "daemon" -> "nano").asJava
val cephService = new ServiceBuilder()
private def setupMinioStorage(): Unit = {
val labels = Map("app" -> "minio").asJava
val minioService = new ServiceBuilder()
.withNewMetadata()
.withName(svcName)
.withLabels(labels)
.endMetadata()
.withNewSpec()
.withPorts(new ServicePortBuilder()
.withName("https")
.withPort(8000)
.withPort(9000)
.withProtocol("TCP")
.withTargetPort(new IntOrString(8000))
.withTargetPort(new IntOrString(9000))
.build()
)
.withType("NodePort")
.withSelector(labels)
.endSpec()
.build()

val cephStatefulSet = new StatefulSetBuilder()
val minioStatefulSet = new StatefulSetBuilder()
.withNewMetadata()
.withName(cName)
.withLabels(labels)
.endMetadata()
.withNewSpec()
.withReplicas(1)
.withNewSelector()
.withMatchLabels(Map("app" -> "ceph").asJava)
.withMatchLabels(Map("app" -> "minio").asJava)
.endSelector()
.withServiceName(cName)
.withNewTemplate()
Expand All @@ -115,7 +114,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
.withLabels(labels)
.endMetadata()
.withNewSpec()
.withContainers(getCephContainer())
.withContainers(getMinioContainer())
.endSpec()
.endTemplate()
.endSpec()
Expand All @@ -124,16 +123,16 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
kubernetesTestComponents
.kubernetesClient
.services()
.create(cephService)
.create(minioService)

kubernetesTestComponents
.kubernetesClient
.apps()
.statefulSets()
.create(cephStatefulSet)
.create(minioStatefulSet)
}

private def deleteCephStorage(): Unit = {
private def deleteMinioStorage(): Unit = {
kubernetesTestComponents
.kubernetesClient
.apps()
Expand All @@ -151,47 +150,30 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
test("Launcher client dependencies", k8sTestTag, MinikubeTag) {
val fileName = Utils.createTempFile(FILE_CONTENTS, HOST_PATH)
try {
setupCephStorage()
val cephUrlStr = getServiceUrl(svcName)
val cephUrl = new URL(cephUrlStr)
val cephHost = cephUrl.getHost
val cephPort = cephUrl.getPort
setupMinioStorage()
val minioUrlStr = getServiceUrl(svcName)
val minioUrl = new URL(minioUrlStr)
val minioHost = minioUrl.getHost
val minioPort = minioUrl.getPort
val examplesJar = Utils.getExamplesJarAbsolutePath(sparkHomeDir)
val (accessKey, secretKey) = getCephCredentials()
sparkAppConf
.set("spark.hadoop.fs.s3a.access.key", accessKey)
.set("spark.hadoop.fs.s3a.secret.key", secretKey)
.set("spark.hadoop.fs.s3a.access.key", ACCESS_KEY)
.set("spark.hadoop.fs.s3a.secret.key", SECRET_KEY)
.set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
.set("spark.hadoop.fs.s3a.endpoint", s"$cephHost:$cephPort")
.set("spark.kubernetes.file.upload.path", s"s3a://$bucket")
.set("spark.hadoop.fs.s3a.endpoint", s"$minioHost:$minioPort")
.set("spark.kubernetes.file.upload.path", s"s3a://$BUCKET")
.set("spark.files", s"$HOST_PATH/$fileName")
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.set("spark.jars.packages", "com.amazonaws:aws-java-sdk:" +
"1.7.4,org.apache.hadoop:hadoop-aws:2.7.6")
.set("spark.driver.extraJavaOptions", "-Divy.cache.dir=/tmp -Divy.home=/tmp")
createS3Bucket(accessKey, secretKey, cephUrlStr)
createS3Bucket(ACCESS_KEY, SECRET_KEY, minioUrlStr)
runSparkRemoteCheckAndVerifyCompletion(appResource = examplesJar,
appArgs = Array(fileName),
timeout = Option(DEPS_TIMEOUT))
} finally {
// make sure this always runs
deleteCephStorage()
}
}

// There isn't a cleaner way to get the credentials
// when ceph-nano runs on k8s
private def getCephCredentials(): (String, String) = {
Eventually.eventually(TIMEOUT, INTERVAL) {
val cephPod = kubernetesTestComponents
.kubernetesClient
.pods()
.withName(s"$cName-0")
.get()
implicit val podName: String = cephPod.getMetadata.getName
implicit val components: KubernetesTestComponents = kubernetesTestComponents
val contents = Utils.executeCommand("cat", "/nano_user_details")
(extractS3Key(contents, "access_key"), extractS3Key(contents, "secret_key"))
deleteMinioStorage()
}
}

Expand All @@ -211,10 +193,10 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
val credentials = new BasicAWSCredentials(accessKey, secretKey)
val s3client = new AmazonS3Client(credentials)
s3client.setEndpoint(endPoint)
s3client.createBucket(bucket)
s3client.createBucket(BUCKET)
} catch {
case e: Exception =>
throw new SparkException(s"Failed to create bucket $bucket.", e)
throw new SparkException(s"Failed to create bucket $BUCKET.", e)
}
}
}
Expand Down

0 comments on commit f206bbd

Please sign in to comment.