Skip to content

feat: support for Spark 4 #589

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/modules/spark-k8s/examples/example-history-app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
name: spark-pi-s3-1
spec:
sparkImage:
productVersion: 3.5.6
productVersion: 4.0.0
pullPolicy: IfNotPresent
mode: cluster
mainClass: org.apache.spark.examples.SparkPi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
name: spark-history
spec:
image:
productVersion: 3.5.6
productVersion: 4.0.0
logFileDirectory: # <1>
s3:
prefix: eventlogs/ # <2>
Expand Down
18 changes: 9 additions & 9 deletions docs/modules/spark-k8s/examples/example-pvc.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
apiVersion: v1
kind: PersistentVolume
kind: PersistentVolume # <1>
metadata:
name: pv-ksv # <1>
name: pv-ksv
spec:
storageClassName: standard
accessModes:
Expand All @@ -13,31 +13,31 @@ spec:
path: /some-host-location
---
apiVersion: v1
kind: PersistentVolumeClaim
kind: PersistentVolumeClaim # <2>
metadata:
name: pvc-ksv # <2>
name: pvc-ksv
spec:
volumeName: pv-ksv # <1>
volumeName: pv-ksv # <3>
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
---
apiVersion: batch/v1
kind: Job
kind: Job # <4>
metadata:
name: aws-deps
spec:
template:
spec:
restartPolicy: Never
volumes:
- name: job-deps # <3>
- name: job-deps
persistentVolumeClaim:
claimName: pvc-ksv # <2>
claimName: pvc-ksv # <5>
containers:
- name: aws-deps
volumeMounts:
- name: job-deps # <4>
- name: job-deps
mountPath: /stackable/spark/dependencies
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
namespace: default
spec:
sparkImage:
productVersion: 3.5.6
productVersion: 4.0.0
mode: cluster
mainApplicationFile: s3a://stackable-spark-k8s-jars/jobs/ny-tlc-report-1.1.0.jar # <3>
mainClass: tech.stackable.demo.spark.NYTLCReport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ metadata:
spec:
image: oci.stackable.tech/stackable/ny-tlc-report:0.2.0 # <1>
sparkImage:
productVersion: 3.5.6
productVersion: 4.0.0
mode: cluster
mainApplicationFile: local:///stackable/spark/jobs/ny_tlc_report.py # <2>
args:
Expand Down
16 changes: 9 additions & 7 deletions docs/modules/spark-k8s/examples/example-sparkapp-pvc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,30 @@ metadata:
namespace: default
spec:
sparkImage:
productVersion: 3.5.6
productVersion: 4.0.0
mode: cluster
mainApplicationFile: s3a://stackable-spark-k8s-jars/jobs/ny-tlc-report-1.0-SNAPSHOT.jar # <1>
mainApplicationFile: s3a://my-bucket/app.jar # <1>
mainClass: org.example.App # <2>
args:
- "'s3a://nyc-tlc/trip data/yellow_tripdata_2021-07.csv'"
sparkConf: # <3>
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
"spark.driver.extraClassPath": "/dependencies/jars/*"
"spark.executor.extraClassPath": "/dependencies/jars/*"
volumes:
- name: job-deps # <4>
persistentVolumeClaim:
claimName: pvc-ksv
driver:
job:
config:
volumeMounts:
- name: job-deps
mountPath: /dependencies # <5>
driver:
config:
volumeMounts:
- name: job-deps
mountPath: /dependencies # <6>
executor:
replicas: 3
config:
volumeMounts:
- name: job-deps
mountPath: /dependencies # <5>
mountPath: /dependencies # <7>
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
name: example-sparkapp-s3-private
spec:
sparkImage:
productVersion: 3.5.6
productVersion: 4.0.0
mode: cluster
mainApplicationFile: s3a://my-bucket/spark-examples.jar # <1>
mainClass: org.apache.spark.examples.SparkPi # <2>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
namespace: default
spec:
sparkImage:
productVersion: 3.5.6
productVersion: 4.0.0
mode: cluster
mainApplicationFile: local:///stackable/spark/examples/src/main/python/streaming/hdfs_wordcount.py
args:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ metadata:
namespace: default
spec:
sparkImage:
productVersion: 3.5.6
productVersion: 4.0.0
mode: cluster
mainApplicationFile: local:///stackable/spark/examples/src/main/python/pi.py
driver:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ metadata:
namespace: default
spec:
sparkImage:
productVersion: 3.5.6
productVersion: 4.0.0
mode: cluster
mainApplicationFile: local:///stackable/spark/examples/src/main/python/pi.py
driver:
Expand Down
90 changes: 38 additions & 52 deletions docs/modules/spark-k8s/pages/usage-guide/job-dependencies.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@

== Overview

IMPORTANT: With the platform release 23.4.1 and Apache Spark 3.3.x (and all previous releases), dynamic provisioning of dependencies using the Spark `packages` field doesn't work.
This is a known problem with Spark and is tracked https://github.com/stackabletech/spark-k8s-operator/issues/141[here].

The container images provided by Stackable include Apache Spark and PySpark applications and libraries.
In addition, they include commonly used libraries to connect to storage systems supporting the `hdfs://`, `s3a://` and `abfs://` protocols. These systems are commonly used to store data processed by Spark applications.

Expand All @@ -24,46 +21,29 @@ To provision job dependencies in Spark workloads, you construct the `SparkApplic
* Maven/Java packages
* Python packages

The following table provides a high level overview of the relevant aspects of each method.

|===
|Dependency specification |Job image size |Reproduciblity |Dev-op cost

|Custom Spark images
|Large
|Guaranteed
|Medium to High

|Dependency volumes
|Small
|Guaranteed
|Small to Medium

|Maven/Java packages
|Small
|Not guaranteed
|Small

|Python packages
|Small
|Not guaranteed
|Small
|===

=== Custom Spark images

With this method, you submit a `SparkApplication` for which the `sparkImage` refers to the full custom image name. It is recommended to start the custom image from one of the Stackable images to ensure compatibility with the Stackable operator.
With this method, you submit a `SparkApplication` for which the `sparkImage` refers to the full custom image name. It is recommended to start the custom image from one of the Stackable Spark images to ensure compatibility with the operator.

Below is an example of a custom image that includes a JDBC driver:

[source, Dockerfile]
----
FROM oci.stackable.tech/sdp/spark-k8s:3.5.6-stackable25.3.0 # <1>
FROM oci.stackable.tech/sdp/spark-k8s:4.0.0-stackable0.0.0-dev # <1>

RUN curl --fail -o /stackable/spark/jars/postgresql-42.6.0.jar "https://jdbc.postgresql.org/download/postgresql-42.6.0.jar"
RUN curl --fail -o /stackable/spark/jars/postgresql-42.6.0.jar "https://jdbc.postgresql.org/download/postgresql-42.6.0.jar" # <2>
----

<1> Start from an existing Stackable image.
<2> Download the JDBC driver and place it in the Spark JARs directory.

Build your custom image and push it to your container registry.

[source, bash]
----
docker build -t my-registry/spark-k8s:4.0.0-psql .
docker push my-registry/spark-k8s:4.0.0-psql
----

And the following snippet showcases an application that uses the custom image:

Expand All @@ -75,14 +55,13 @@ metadata:
name: spark-jdbc
spec:
sparkImage:
custom: "oci.stackable.tech/sandbox/spark-k8s:3.5.6-stackable0.0.0-dev" # <1>
productVersion: "3.5.6" # <2>
pullPolicy: IfNotPresent # <3>
custom: "my-registry/spark-k8s:4.0.0-psql" # <1>
productVersion: "4.0.0" # <2>
...
----
<1> Name of the custom image.
<2> Apache Spark version. Needed for the operator to take the correct actions.
<3> Optional. Defaults to `Always`.

<1> Reference to your custom image..
<2> Apache Spark version bundled in your custom image.

=== Dependency volumes

Expand All @@ -93,28 +72,34 @@ With this method, the job dependencies are provisioned from a `PersistentVolume`
include::example$example-sparkapp-pvc.yaml[]
----
<1> Job artifact located on S3.
<2> Job main class
<3> Spark dependencies: the credentials provider (the user knows what is relevant here) plus dependencies needed to access external resources (in this case, in s3, accessed without credentials)
<4> the name of the volume mount backed by a `PersistentVolumeClaim` that must be pre-existing
<5> the path on the volume mount: this is referenced in the `sparkConf` section where the extra class path is defined for the driver and executors
<2> Name of the main class to run.
<3> The job dependencies provisioned from the volume below are added to the class path of the driver and executors.
<4> A `PersistentVolumeClaim` created by the user prior to submitting the Spark job.
<5> The volume containing the dependencies is mounted in the job pod.
<6> The volume containing the dependencies is mounted in the driver pod.
<7> The volume containing the dependencies is mounted in the executor pods.

NOTE: The Spark operator has no control over the contents of the dependency volume. It is your responsibility to make sure all required dependencies are installed in the correct versions.

A `PersistentVolumeClaim` and the associated `PersistentVolume` can be defined like this:
A `PersistentVolumeClaim` and the associated `PersistentVolume` can be defined and provisioned like this:

[source,yaml]
----
include::example$example-pvc.yaml[]
----
<1> Reference to a `PersistentVolume`, defining some cluster-reachable storage
<2> The name of the `PersistentVolumeClaim` that references the PV
<3> Defines a `Volume` backed by the PVC, local to the Custom Resource
<4> Defines the `VolumeMount` that is used by the Custom Resource

<1> Create a volume. This definition, the size and type of the volume are highly dependent on the type of cluster you are using.
<2> Create a persistent volume claim. This allows the volume to be populated with the necessary dependencies and later on referenced by the Spark job.
<3> The volume name is referenced by the `PersistentVolumeClaim`.
<4> Create a job that mounts the volume and populates it with the necessary dependencies. This must job can be run before submitting the Spark job.
<5> The job references the `PersistentVolumeClaim` created above.

=== Maven packages

The last and most flexible way to provision dependencies is to use the built-in `spark-submit` support for Maven package coordinates.
The downside of this method is that job dependencies are downloaded every time the job is submitted and this has several implications you must be aware of.
For example, the job submission time will be longer than with the other methods
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
For example, the job submission time will be longer than with the other methods
For example, the job submission time will be longer than with the other methods.

Network connectivity problems may lead to job submission failures.
And finally, not all type of dependencies can be provisioned this way. Most notably, JDBC drivers cannot be provisioned this way since the JVM will only look for them at startup time.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
And finally, not all type of dependencies can be provisioned this way. Most notably, JDBC drivers cannot be provisioned this way since the JVM will only look for them at startup time.
And finally, not all type of dependencies can be provisioned this way.
Most notably, JDBC drivers cannot be provisioned this way since the JVM will only look for them at startup time.


The snippet below showcases how to add Apache Iceberg support to a Spark (version 3.4.x) application.

Expand All @@ -138,11 +123,12 @@ spec:
...
----

<1> Maven package coordinates for Apache Iceberg. This is downloaded from the Manven repository and made available to the Spark application.
<1> Maven package coordinates for Apache Iceberg. This is downloaded from the central Maven repository and made available to the Spark application.

IMPORTANT: Currently it's not possible to provision dependencies that are loaded by the JVM's https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/ClassLoader.html#getSystemClassLoader()[system class loader].
Such dependencies include JDBC drivers.
If you need access to JDBC sources from your Spark application, consider building your own custom Spark image as shown above.
As mentioned above, not all dependencies can be provisioned this way.
JDBC drivers are notorious for not being supported by this method but other types of dependencies may also not work.
If a jar file can be provisioned using it's Maven coordinates or not, depends a lot on the way it is loaded by the JVM.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
If a jar file can be provisioned using it's Maven coordinates or not, depends a lot on the way it is loaded by the JVM.
If a jar file can be provisioned using its Maven coordinates or not, depends a lot on the way it is loaded by the JVM.

In such cases, consider building your own custom Spark image as shown above.

=== Python packages

Expand Down
1 change: 1 addition & 0 deletions docs/modules/spark-k8s/partials/supported-versions.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
// Stackable Platform documentation.
// Please sort the versions in descending order (newest first)

- 4.0.0 (Hadoop 3.4.1, Scala 2.13, Python 3.11, Java 17) (Experimental)
- 3.5.5 (Hadoop 3.3.4, Scala 2.12, Python 3.11, Java 17) (Deprecated)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- 3.5.5 (Hadoop 3.3.4, Scala 2.12, Python 3.11, Java 17) (Deprecated)
- 3.5.6 (Hadoop 3.3.4, Scala 2.12, Python 3.11, Java 17) (LTS)

- 3.5.6 (Hadoop 3.3.4, Scala 2.12, Python 3.11, Java 17) (LTS)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- 3.5.6 (Hadoop 3.3.4, Scala 2.12, Python 3.11, Java 17) (LTS)
- 3.5.5 (Hadoop 3.3.4, Scala 2.12, Python 3.11, Java 17) (Deprecated)

9 changes: 5 additions & 4 deletions rust/operator-binary/src/connect/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,20 +550,21 @@ pub(crate) fn server_properties(
),
(
"spark.driver.extraClassPath".to_string(),
Some(format!("/stackable/spark/extra-jars/*:/stackable/spark/connect/spark-connect_2.12-{spark_version}.jar")),
Some(format!("/stackable/spark/extra-jars/*:/stackable/spark/connect/spark-connect-{spark_version}.jar")),
),
(
"spark.metrics.conf".to_string(),
Some(format!("{VOLUME_MOUNT_PATH_CONFIG}/{METRICS_PROPERTIES_FILE}")),
Some(format!(
"{VOLUME_MOUNT_PATH_CONFIG}/{METRICS_PROPERTIES_FILE}"
)),
),
// This enables the "/metrics/executors/prometheus" endpoint on the server pod.
// The driver collects metrics from the executors and makes them available here.
// The "/metrics/prometheus" endpoint delievers the driver metrics.
// The "/metrics/prometheus" endpoint delivers the driver metrics.
(
"spark.ui.prometheus.enabled".to_string(),
Some("true".to_string()),
),

]
.into();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ spec:
vectorAggregatorConfigMapName: vector-aggregator-discovery
{% endif %}
sparkImage:
{% if test_scenario['values']['spark'].find(",") > 0 %}
custom: "{{ test_scenario['values']['spark'].split(',')[1] }}"
productVersion: "{{ test_scenario['values']['spark'].split(',')[0] }}"
{% if test_scenario['values']['spark-hbase-connector'].find(",") > 0 %}
custom: "{{ test_scenario['values']['spark-hbase-connector'].split(',')[1] }}"
productVersion: "{{ test_scenario['values']['spark-hbase-connector'].split(',')[0] }}"
{% else %}
productVersion: "{{ test_scenario['values']['spark'] }}"
productVersion: "{{ test_scenario['values']['spark-hbase-connector'] }}"
{% endif %}
# pullPolicy: IfNotPresent
pullPolicy: Always
Expand Down
10 changes: 5 additions & 5 deletions tests/templates/kuttl/iceberg/10-deploy-spark-app.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ spec:
vectorAggregatorConfigMapName: vector-aggregator-discovery
{% endif %}
sparkImage:
{% if test_scenario['values']['spark'].find(",") > 0 %}
custom: "{{ test_scenario['values']['spark'].split(',')[1] }}"
productVersion: "{{ test_scenario['values']['spark'].split(',')[0] }}"
{% if test_scenario['values']['spark-iceberg'].find(",") > 0 %}
custom: "{{ test_scenario['values']['spark-iceberg'].split(',')[1] }}"
productVersion: "{{ test_scenario['values']['spark-iceberg'].split(',')[0] }}"
{% else %}
productVersion: "{{ test_scenario['values']['spark'] }}"
productVersion: "{{ test_scenario['values']['spark-iceberg'] }}"
{% endif %}
pullPolicy: IfNotPresent
mode: cluster
Expand Down Expand Up @@ -48,7 +48,7 @@ spec:
#
# We extract the spark parts from the test scenario value.
#
- org.apache.iceberg:iceberg-spark-runtime-{{ ".".join(test_scenario['values']['spark'].split('.')[:2]) }}_2.12:1.8.1
- org.apache.iceberg:iceberg-spark-runtime-{{ ".".join(test_scenario['values']['spark-iceberg'].split('.')[:2]) }}_2.12:1.8.1
volumes:
- name: script
configMap:
Expand Down
Loading