Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
9ccd939
Fix: Add task context labels to driver and executor pods for SparkKub…
asb108 May 19, 2025
9f1aa03
Fix formatting in test_spark_kubernetes.py
asb108 May 20, 2025
ad2d64f
Fix test assertions in SparkKubernetesOperator tests to handle task c…
asb108 May 20, 2025
11782dc
Fix whitespace issues in spark_kubernetes.py
asb108 May 20, 2025
f1e6961
fix format and resolves failing tests
asb108 May 20, 2025
6f626cf
Fix SparkKubernetesOperator test OOM issues
asb108 May 30, 2025
8a3bf5d
Fix: Add task context labels to driver and executor pods for SparkKub…
asb108 May 19, 2025
e105907
Fix whitespace issues in spark_kubernetes.py
asb108 May 20, 2025
69d5d55
Clean up merge conflict markers in test_spark_kubernetes.py
asb108 Jun 11, 2025
d22b28c
Fix test assertions for SparkKubernetesOperator task context labels
asb108 Jun 11, 2025
49074fc
Fix compatibility issue with parent_dag attribute access
asb108 Jun 11, 2025
fa31f15
Align _get_ti_pod_labels implementation with KubernetesPodOperator
asb108 Jun 11, 2025
5b28c36
feat: Add reattach functionality to SparkKubernetesOperator
asb108 Jun 12, 2025
b81af4b
Fix: Add task context labels to driver and executor pods for SparkKub…
asb108 May 19, 2025
ce6d6ca
Fix formatting in test_spark_kubernetes.py
asb108 May 20, 2025
241a628
Fix test assertions in SparkKubernetesOperator tests to handle task c…
asb108 May 20, 2025
596785e
Fix whitespace issues in spark_kubernetes.py
asb108 May 20, 2025
8d82b4a
fix format and resolves failing tests
asb108 May 20, 2025
ececa46
Fix SparkKubernetesOperator test OOM issues
asb108 May 30, 2025
ac613fd
Fix: Add task context labels to driver and executor pods for SparkKub…
asb108 May 19, 2025
440c13c
Fix whitespace issues in spark_kubernetes.py
asb108 May 20, 2025
80a56b7
Clean up merge conflict markers in test_spark_kubernetes.py
asb108 Jun 11, 2025
ce02274
Fix test assertions for SparkKubernetesOperator task context labels
asb108 Jun 11, 2025
99fadaf
Fix compatibility issue with parent_dag attribute access
asb108 Jun 11, 2025
b46a98c
Align _get_ti_pod_labels implementation with KubernetesPodOperator
asb108 Jun 11, 2025
1e97be8
feat: Add reattach functionality to SparkKubernetesOperator
asb108 Jun 12, 2025
163fad9
Merge branch 'fix-spark-kubernetes-operator-reattach-41211' of https:…
asb108 Jul 22, 2025
8512645
Merge branch 'main' into fix-spark-kubernetes-operator-reattach-41211
asb108 Jul 22, 2025
ae20d07
Merge branch 'main' into fix-spark-kubernetes-operator-reattach-41211
asb108 Jul 23, 2025
a982878
Merge branch 'main' into fix-spark-kubernetes-operator-reattach-41211
asb108 Jul 24, 2025
191519d
Fix SparkKubernetesOperator reattach with task context labels
asb108 Jul 24, 2025
6f33959
Fix code formatting for static checks
asb108 Jul 24, 2025
36068b2
Merge branch 'main' into fix-spark-kubernetes-operator-reattach-41211
asb108 Jul 24, 2025
2c16581
Merge branch 'fix-spark-kubernetes-operator-reattach-41211' of https:…
asb108 Jul 24, 2025
b36418b
Merge branch 'main' into fix-spark-kubernetes-operator-reattach-41211
asb108 Sep 10, 2025
79c6134
Merge branch 'main' into fix-spark-kubernetes-operator-reattach-41211
asb108 Sep 10, 2025
e757a56
Merge branch 'main' into fix-spark-kubernetes-operator-reattach-41211
asb108 Sep 10, 2025
0f52d58
Merge branch 'apache:main' into fix-spark-kubernetes-operator-reattac…
asb108 Sep 10, 2025
bef7492
update tests
asb108 Sep 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ class SparkKubernetesOperator(KubernetesPodOperator):
:param success_run_history_limit: Number of past successful runs of the application to keep.
:param startup_timeout_seconds: timeout in seconds to startup the pod.
:param log_events_on_failure: Log the pod's events if a failure occurs
:param reattach_on_restart: if the scheduler dies while the pod is running, reattach and monitor
:param reattach_on_restart: if the scheduler dies while the pod is running, reattach and monitor.
When enabled, the operator automatically adds Airflow task context labels (dag_id, task_id, run_id)
to the driver and executor pods to enable finding them for reattachment.
:param delete_on_termination: What to do when the pod reaches its final
state, or the execution is interrupted. If True (default), delete the
pod; if False, leave the pod.
Expand Down Expand Up @@ -203,17 +205,16 @@ def _get_ti_pod_labels(context: Context | None = None, include_try_number: bool
"spark_kubernetes_operator": "True",
}

# If running on Airflow 2.3+:
map_index = getattr(ti, "map_index", -1)
if map_index >= 0:
labels["map_index"] = map_index
map_index = ti.map_index
if map_index is not None and map_index >= 0:
labels["map_index"] = str(map_index)

if include_try_number:
labels.update(try_number=ti.try_number)
labels.update(try_number=str(ti.try_number))

# In the case of sub dags this is just useful
# TODO: Remove this when the minimum version of Airflow is bumped to 3.0
if getattr(context_dict["dag"], "is_subdag", False):
if getattr(context_dict["dag"], "parent_dag", False):
labels["parent_dag_id"] = context_dict["dag"].parent_dag.dag_id
# Ensure that label is valid for Kube,
# and if not truncate/remove invalid chars and replace with short hash.
Expand All @@ -226,9 +227,11 @@ def _get_ti_pod_labels(context: Context | None = None, include_try_number: bool
def pod_manager(self) -> PodManager:
return PodManager(kube_client=self.client)

@staticmethod
def _try_numbers_match(context, pod) -> bool:
return pod.metadata.labels["try_number"] == context["ti"].try_number
def _try_numbers_match(self, context, pod) -> bool:
task_instance = context["task_instance"]
task_context_labels = self._get_ti_pod_labels(context)
pod_try_number = pod.metadata.labels.get(task_context_labels.get("try_number", ""), "")
return str(task_instance.try_number) == str(pod_try_number)

@property
def template_body(self):
Expand All @@ -251,20 +254,9 @@ def find_spark_job(self, context, exclude_checked: bool = True):
"Found matching driver pod %s with labels %s", pod.metadata.name, pod.metadata.labels
)
self.log.info("`try_number` of task_instance: %s", context["ti"].try_number)
self.log.info("`try_number` of pod: %s", pod.metadata.labels["try_number"])
self.log.info("`try_number` of pod: %s", pod.metadata.labels.get("try_number", "unknown"))
return pod

def get_or_create_spark_crd(self, context) -> k8s.V1Pod:
if self.reattach_on_restart:
driver_pod = self.find_spark_job(context)
if driver_pod:
return driver_pod

driver_pod, spark_obj_spec = self.launcher.start_spark_job(
image=self.image, code_path=self.code_path, startup_timeout=self.startup_timeout_seconds
)
return driver_pod

def process_pod_deletion(self, pod, *, reraise=True):
if pod is not None:
if self.delete_on_termination:
Expand Down Expand Up @@ -294,25 +286,79 @@ def client(self) -> CoreV1Api:
def custom_obj_api(self) -> CustomObjectsApi:
return CustomObjectsApi()

@cached_property
def launcher(self) -> CustomObjectLauncher:
launcher = CustomObjectLauncher(
name=self.name,
namespace=self.namespace,
kube_client=self.client,
custom_obj_api=self.custom_obj_api,
template_body=self.template_body,
def get_or_create_spark_crd(self, launcher: CustomObjectLauncher, context) -> k8s.V1Pod:
if self.reattach_on_restart:
driver_pod = self.find_spark_job(context)
if driver_pod:
return driver_pod

driver_pod, spark_obj_spec = launcher.start_spark_job(
image=self.image, code_path=self.code_path, startup_timeout=self.startup_timeout_seconds
)
return launcher
return driver_pod

def execute(self, context: Context):
self.name = self.create_job_name()

self._setup_spark_configuration(context)

if self.deferrable:
self.execute_async(context)

return super().execute(context)

def _setup_spark_configuration(self, context: Context):
"""Set up Spark-specific configuration including reattach logic."""
import copy

template_body = copy.deepcopy(self.template_body)

if self.reattach_on_restart:
task_context_labels = self._get_ti_pod_labels(context)

existing_pod = self.find_spark_job(context)
if existing_pod:
self.log.info(
"Found existing Spark driver pod %s. Reattaching to it.", existing_pod.metadata.name
)
self.pod = existing_pod
self.pod_request_obj = None
return

if "spark" not in template_body:
template_body["spark"] = {}
if "spec" not in template_body["spark"]:
template_body["spark"]["spec"] = {}

spec_dict = template_body["spark"]["spec"]

if "labels" not in spec_dict:
spec_dict["labels"] = {}
spec_dict["labels"].update(task_context_labels)

for component in ["driver", "executor"]:
if component not in spec_dict:
spec_dict[component] = {}

if "labels" not in spec_dict[component]:
spec_dict[component]["labels"] = {}

spec_dict[component]["labels"].update(task_context_labels)

self.log.info("Creating sparkApplication.")
self.pod = self.get_or_create_spark_crd(context)
self.launcher = CustomObjectLauncher(
name=self.name,
namespace=self.namespace,
kube_client=self.client,
custom_obj_api=self.custom_obj_api,
template_body=template_body,
)
self.pod = self.get_or_create_spark_crd(self.launcher, context)
self.pod_request_obj = self.launcher.pod_spec

return super().execute(context=context)
def find_pod(self, namespace: str, context: Context, *, exclude_checked: bool = True):
"""Override parent's find_pod to use our Spark-specific find_spark_job method."""
return self.find_spark_job(context, exclude_checked=exclude_checked)

def on_kill(self) -> None:
if self.launcher:
Expand Down
Loading