Skip to content
Open
70 changes: 70 additions & 0 deletions docs/user-guide/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,73 @@ spark_client.submit_app(
],
)
```

### Executor configuration

#### Executor pod template

The package provides an enhanced way to configure executor pod templates compared to Spark's built-in `spark.kubernetes.executor.podTemplateFile` configuration.

Instead of manually creating ConfigMaps and managing volume mounts, you can use the `executor_template` parameter that accepts either:
- Template content as a string (YAML)
- Path to a local template file

The package automatically:
1. Creates a ConfigMap with the template content
2. Mounts it to the driver pod (so the driver can validate the template)
3. Configures Spark to use the template for executor pods
4. Cleans up the ConfigMap when the application finishes

**Using template content as string:**
```python
executor_template_yaml = """
apiVersion: v1
kind: Pod
spec:
containers:
- name: executor
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "4Gi"
cpu: "2"
env:
- name: CUSTOM_ENV_VAR
value: "custom-value"
nodeSelector:
node-type: "compute-optimized"
tolerations:
- key: "spark-executor"
operator: "Equal"
value: "true"
effect: "NoSchedule"
"""

spark_client.submit_app(
...,
executor_template=executor_template_yaml,
)
```

**Using local template file:**
```python
# Save your template to a file, e.g., executor-template.yaml
spark_client.submit_app(
...,
executor_template="./executor-template.yaml",
)
```

**Via CLI:**
```bash
# Using template file
spark-on-k8s submit --executor-template ./executor-template.yaml ...

# You can also set it via environment variable
export SPARK_ON_K8S_EXECUTOR_TEMPLATE="./executor-template.yaml"
spark-on-k8s submit ...
```

**Note:** The `executor_template` parameter takes precedence over `executor_pod_template_path`. If you need to use external template files (S3, GCS, etc.), use `executor_pod_template_path` instead.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ packages = [{include = "spark_on_k8s"}]
python = "^3.9"
kubernetes = ">=26.1.0"
click = "^8.0.1"
pyyaml = "^6.0"
fastapi = {version = "^0.109.1", optional = true}
kubernetes-asyncio = {version = ">=26.9.0", optional = true}
uvicorn = {version = "^0.26.0", optional = true}
Expand Down
4 changes: 4 additions & 0 deletions spark_on_k8s/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
executor_min_instances_option,
executor_node_selector_option,
executor_pod_template_path_option,
executor_template_option,
force_option,
image_pull_policy_option,
logs_option,
Expand Down Expand Up @@ -137,6 +138,7 @@ def wait(app_id: str, namespace: str):
driver_annotations_option,
executor_annotations_option,
executor_pod_template_path_option,
executor_template_option,
],
help="Submit a Spark application.",
)
Expand Down Expand Up @@ -175,6 +177,7 @@ def submit(
driver_annotations: dict[str, str],
executor_annotations: dict[str, str],
executor_pod_template_path: str,
executor_pod_template: str,
):
from spark_on_k8s.client import ExecutorInstances, PodResources, SparkOnK8S

Expand Down Expand Up @@ -219,4 +222,5 @@ def submit(
driver_annotations=driver_annotations,
executor_annotations=executor_annotations,
executor_pod_template_path=executor_pod_template_path,
executor_template=executor_pod_template,
)
7 changes: 7 additions & 0 deletions spark_on_k8s/cli/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,3 +302,10 @@ def validate_list_option(ctx: Context, param: Parameter, value: str) -> list[str
show_default=True,
help="The path to the executor pod template file.",
)
executor_template_option = click.Option(
("--executor-template", "executor_template"),
type=str,
default=Configuration.SPARK_ON_K8S_EXECUTOR_TEMPLATE,
show_default=False,
help="Executor pod template content as string or local file path.",
)
99 changes: 92 additions & 7 deletions spark_on_k8s/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import logging
import re
import yaml
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Literal

from kubernetes import client as k8s
Expand Down Expand Up @@ -112,6 +114,41 @@ def __init__(
self.k8s_client_manager = k8s_client_manager or KubernetesClientManager()
self.app_manager = SparkAppManager(k8s_client_manager=self.k8s_client_manager)

def _process_executor_template(self, executor_template: str) -> str:
"""Process executor template string, reading from file if it's a file path.

Args:
executor_template: Either template content as string or path to template file

Returns:
Template content as string

Raises:
ValueError: If the resulting content is not valid YAML
"""
# Try to read from file first, if that fails use the string as-is
try:
template_path = Path(executor_template).expanduser()
content = template_path.read_text(encoding='utf-8')
except (OSError, PermissionError):
# Not a valid file path, use as content
content = executor_template

# Validate that the content is valid YAML and expected Kubernetes resource
try:
parsed = yaml.safe_load(content)
except yaml.YAMLError as e:
raise ValueError(f"Executor template contains invalid YAML: {e}") from e

# Check if it's a proper Kubernetes resource structure
if not isinstance(parsed, dict):
raise ValueError("Executor template must be a Kubernetes resource (YAML object)")

if parsed.get('kind') != 'Pod':
raise ValueError("Executor template must be a Kubernetes Pod resource (kind: Pod)")

return content

def submit_app(
self,
*,
Expand Down Expand Up @@ -146,6 +183,7 @@ def submit_app(
executor_labels: dict[str, str] | ArgNotSet = NOTSET,
driver_tolerations: list[k8s.V1Toleration] | ArgNotSet = NOTSET,
executor_pod_template_path: str | ArgNotSet = NOTSET,
executor_pod_template: str | ArgNotSet = NOTSET,
startup_timeout: int | ArgNotSet = NOTSET,
driver_init_containers: list[k8s.V1Container] | ArgNotSet = NOTSET,
driver_host_aliases: list[k8s.V1HostAlias] | ArgNotSet = NOTSET,
Expand Down Expand Up @@ -191,6 +229,8 @@ def submit_app(
executor_node_selector: Node selector for the executors
driver_tolerations: List of tolerations for the driver
executor_pod_template_path: Path to the executor pod template file
executor_pod_template: Executor pod template content as string or local file path. If provided,
a ConfigMap will be created and mounted automatically
startup_timeout: Timeout in seconds to wait for the application to start
driver_init_containers: List of init containers to run before the driver starts
driver_host_aliases: List of host aliases for the driver
Expand Down Expand Up @@ -299,6 +339,9 @@ def submit_app(
driver_tolerations = []
if executor_pod_template_path is NOTSET or executor_pod_template_path is None:
executor_pod_template_path = Configuration.SPARK_ON_K8S_EXECUTOR_POD_TEMPLATE_PATH
if executor_pod_template is NOTSET or executor_pod_template is None:

executor_pod_template = Configuration.SPARK_ON_K8S_EXECUTOR_TEMPLATE
if startup_timeout is NOTSET:
startup_timeout = Configuration.SPARK_ON_K8S_STARTUP_TIMEOUT
if driver_init_containers is NOTSET:
Expand Down Expand Up @@ -356,18 +399,48 @@ def submit_app(
basic_conf["spark.executor.instances"] = (
f"{executor_instances.initial if executor_instances.initial is not None else 2}"
)
if executor_volume_mounts:
basic_conf.update(
self._executor_volumes_config(volumes=volumes, volume_mounts=executor_volume_mounts)
)
# Note: executor volume mounts will be processed after volumes are created
if executor_node_selector:
basic_conf.update(self._executor_node_selector(node_selector=executor_node_selector))
if executor_labels:
basic_conf.update(self._executor_labels(labels=executor_labels))
if executor_annotations:
basic_conf.update(self._executor_annotations(annotations=executor_annotations))
if executor_pod_template_path:
# Handle executor template (either string content or local file path)
executor_template_configmap = None
if executor_pod_template and executor_pod_template_path:
raise ValueError(
"Both executor_template and executor_pod_template_path are provided. "
"Please provide only one of them."
)

if executor_pod_template:
# Process the template (read from file if it's a path, otherwise use as content)
template_content = self._process_executor_template(executor_pod_template)

# Create a ConfigMap for the executor template
executor_template_configmap = {
"name": f"{app_id}-executor-template",
"mount_path": "/opt/spark/executor-template",
"sources": [{
"name": "executor-template.yaml",
"text": template_content
}]
}

# Set Spark configuration to use the mounted template
basic_conf.update(self._executor_pod_template_path("/opt/spark/executor-template/executor-template.yaml"))

elif executor_pod_template_path:
# Handle external executor pod template path (S3, GCS, etc.)
basic_conf.update(self._executor_pod_template_path(executor_pod_template_path))

# Process executor volume mounts before building driver command args
if executor_volume_mounts:
basic_conf.update(
self._executor_volumes_config(volumes=volumes, volume_mounts=executor_volume_mounts)
)

driver_command_args = ["driver", "--master", "k8s://https://kubernetes.default.svc.cluster.local:443"]
if class_name:
driver_command_args.extend(["--class", class_name])
Expand All @@ -377,11 +450,19 @@ def submit_app(
self._spark_config_to_arguments({**basic_conf, **spark_conf}) + [app_path, *main_class_parameters]
)

all_configmaps = []
if driver_ephemeral_configmaps_volumes:
all_configmaps.extend(driver_ephemeral_configmaps_volumes)

# Add executor template ConfigMap if present
if executor_template_configmap:
all_configmaps.append(executor_template_configmap)

if all_configmaps:
application_configmaps_volumes = self.app_manager.create_configmap_objects(
app_name=app_name,
app_id=app_id,
configmaps=driver_ephemeral_configmaps_volumes,
configmaps=all_configmaps,
namespace=namespace,
)
for ind, configmap in enumerate(application_configmaps_volumes):
Expand All @@ -391,15 +472,18 @@ def submit_app(
config_map=k8s.V1ConfigMapVolumeSource(name=configmap.metadata.name),
)
)

configmap_mount_path = all_configmaps[ind]["mount_path"]
driver_volume_mounts.append(
k8s.V1VolumeMount(
name=configmap.metadata.name,
mount_path=driver_ephemeral_configmaps_volumes[ind]["mount_path"],
mount_path=configmap_mount_path,
)
)
else:
application_configmaps_volumes = []


pod = SparkAppManager.create_spark_pod_spec(
app_name=app_name,
app_id=app_id,
Expand Down Expand Up @@ -621,6 +705,7 @@ def _executor_volumes_config(
"emptyDir",
"nfs",
"persistentVolumeClaim",
"configMap",
}
loaded_volumes = {}
volumes_config = {}
Expand Down
1 change: 1 addition & 0 deletions spark_on_k8s/utils/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class Configuration:
getenv("SPARK_ON_K8S_SPARK_EXECUTOR_ANNOTATIONS", "{}")
)
SPARK_ON_K8S_EXECUTOR_POD_TEMPLATE_PATH = getenv("SPARK_ON_K8S_EXECUTOR_POD_TEMPLATE_PATH", None)
SPARK_ON_K8S_EXECUTOR_TEMPLATE = getenv("SPARK_ON_K8S_EXECUTOR_TEMPLATE", None)
try:
from kubernetes_asyncio import client as async_k8s

Expand Down
Loading