Skip to content

Commit 1bb52c9

Browse files
authored
test: expand the spark connect test to write to S3 (#567)
* feat: add MinIO user secrets, deployment, and client configuration for Spark Connect * connect test is now passing * add s3 comments * cleanup * switch log level back to INFO * fix typo
1 parent 0fbe335 commit 1bb52c9

File tree

6 files changed

+173
-9
lines changed

6 files changed

+173
-9
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
---
2+
apiVersion: v1
3+
kind: Secret
4+
metadata:
5+
name: minio-users
6+
type: Opaque
7+
stringData:
8+
username1: |
9+
username=spark
10+
password=sparkspark
11+
disabled=false
12+
policies=readwrite,consoleAdmin,diagnostics
13+
setPolicies=false
14+
---
15+
apiVersion: v1
16+
kind: Secret
17+
metadata:
18+
name: s3-credentials
19+
stringData:
20+
accessKey: spark
21+
secretKey: sparkspark
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
---
2+
apiVersion: kuttl.dev/v1beta1
3+
kind: TestAssert
4+
timeout: 900
5+
---
6+
apiVersion: apps/v1
7+
kind: Deployment
8+
metadata:
9+
name: minio
10+
status:
11+
readyReplicas: 1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
---
2+
apiVersion: kuttl.dev/v1beta1
3+
kind: TestStep
4+
commands:
5+
- script: >-
6+
helm install minio
7+
--namespace $NAMESPACE
8+
--version 14.6.16
9+
-f helm-bitnami-minio-values.yaml
10+
--repo https://charts.bitnami.com/bitnami minio
11+
timeout: 240

tests/templates/kuttl/spark-connect/10-deploy-spark-connect.yaml.j2

+17-8
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,29 @@ spec:
3636
vectorAggregatorConfigMapName: vector-aggregator-discovery
3737
{% endif %}
3838
args:
39-
- --packages org.apache.iceberg:iceberg-spark-runtime-{{ ".".join(test_scenario['values']['spark-connect'].split('.')[:2]) }}_2.12:1.8.1
39+
# These are unfortunately required to make the S3A connector work with MinIO
40+
# I had expected the clients to be able to set these, but that is not the case.
41+
- --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
42+
- --conf spark.hadoop.fs.s3a.path.style.access=true
43+
- --conf spark.hadoop.fs.s3a.endpoint=http://minio:9000
44+
- --conf spark.hadoop.fs.s3a.region=us-east-1
4045
server:
4146
podOverrides:
4247
spec:
4348
containers:
4449
- name: spark
4550
env:
46-
- name: DEMO_GREETING
47-
value: "Hello from the overlords"
51+
- name: AWS_ACCESS_KEY_ID
52+
valueFrom:
53+
secretKeyRef:
54+
name: s3-credentials
55+
key: accessKey
56+
- name: AWS_SECRET_ACCESS_KEY
57+
valueFrom:
58+
secretKeyRef:
59+
name: s3-credentials
60+
key: secretKey
61+
4862
jvmArgumentOverrides:
4963
add:
5064
- -Dmy.custom.jvm.arg=customValue
@@ -59,11 +73,6 @@ spec:
5973
configOverrides:
6074
spark-defaults.conf:
6175
spark.jars.ivy: /tmp/ivy2
62-
spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
63-
spark.sql.catalog.local: org.apache.iceberg.spark.SparkCatalog
64-
spark.sql.catalog.local.type: hadoop
65-
spark.sql.catalog.local.warehouse: /tmp/warehouse
66-
spark.sql.defaultCatalog: local
6776
executor:
6877
configOverrides:
6978
spark-defaults.conf:

tests/templates/kuttl/spark-connect/20-run-connect-client.yaml.j2

+64-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,49 @@
11
---
2+
apiVersion: v1
3+
kind: ConfigMap
4+
metadata:
5+
name: spark-connect-client
6+
data:
7+
example.py: |-
8+
import sys
9+
10+
from pyspark.sql import SparkSession
11+
import pyspark.sql.functions as fn
12+
13+
if __name__ == "__main__":
14+
remote: str = sys.argv[1]
15+
16+
print(f"Connecting to Spark Connect server at {remote}")
17+
18+
# Adding s3a configuration properties here has no effect unfortunately.
19+
# They need to be set in the SparkConnectServer.
20+
spark = (
21+
SparkSession.builder.appName("SimpleSparkConnectApp")
22+
.remote(remote)
23+
.getOrCreate()
24+
)
25+
26+
logFile = "/stackable/spark/README.md"
27+
28+
print(f"Reading log file: {logFile}")
29+
logData = spark.read.text(logFile).cache()
30+
31+
print("Counting words in log file")
32+
wc = (
33+
logData.select(
34+
fn.explode(fn.split(logData["value"], r"\s+"))
35+
.alias("words"))
36+
.groupBy("words").count()
37+
)
38+
39+
wc.show()
40+
41+
dest = "s3a://mybucket/wordcount"
42+
print(f"Writing word count to S3 {dest}")
43+
wc.write.mode("overwrite").parquet(dest)
44+
45+
spark.stop()
46+
---
247
apiVersion: batch/v1
348
kind: Job
449
metadata:
@@ -21,7 +66,7 @@ spec:
2166
command:
2267
[
2368
"/usr/bin/python",
24-
"/stackable/spark-connect-examples/python/simple-connect-app.py",
69+
"/app/example.py",
2570
"sc://spark-connect-server-default",
2671
]
2772
resources:
@@ -31,3 +76,21 @@ spec:
3176
requests:
3277
cpu: 200m
3378
memory: 128Mi
79+
env:
80+
- name: AWS_ACCESS_KEY_ID
81+
valueFrom:
82+
secretKeyRef:
83+
name: s3-credentials
84+
key: accessKey
85+
- name: AWS_SECRET_ACCESS_KEY
86+
valueFrom:
87+
secretKeyRef:
88+
name: s3-credentials
89+
key: secretKey
90+
volumeMounts:
91+
- name: spark-connect-client
92+
mountPath: /app
93+
volumes:
94+
- name: spark-connect-client
95+
configMap:
96+
name: spark-connect-client
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
---
2+
mode: standalone
3+
disableWebUI: false
4+
extraEnvVars:
5+
- name: BITNAMI_DEBUG
6+
value: "true"
7+
- name: MINIO_LOG_LEVEL
8+
value: DEBUG
9+
10+
provisioning:
11+
enabled: true
12+
buckets:
13+
- name: mybucket
14+
usersExistingSecrets:
15+
- minio-users
16+
resources:
17+
requests:
18+
memory: 1Gi
19+
cpu: "512m"
20+
limits:
21+
memory: "1Gi"
22+
cpu: "1"
23+
podSecurityContext:
24+
enabled: false
25+
containerSecurityContext:
26+
enabled: false
27+
28+
volumePermissions:
29+
enabled: false
30+
31+
podSecurityContext:
32+
enabled: false
33+
34+
containerSecurityContext:
35+
enabled: false
36+
37+
persistence:
38+
enabled: false
39+
40+
resources:
41+
requests:
42+
memory: 1Gi
43+
cpu: "512m"
44+
limits:
45+
memory: "1Gi"
46+
cpu: "1"
47+
48+
service:
49+
type: NodePort

0 commit comments

Comments
 (0)