Skip to content

Conversation

@SameerMesiah97
Copy link
Contributor

@SameerMesiah97 SameerMesiah97 commented Jan 17, 2026

Description

This change makes def find_spark_job deterministic when multiple Spark driver pods match the same task labels.

Previously, the operator raised an exception if more than one matching pod was found. Instead, the operator now deterministically selects a single pod for reattachment using a stable ordering strategy:

  • Pods are ordered by creation_timestamp
  • If timestamps are identical, the pod name is used as a deterministic tie-breaker
  • The pod with the latest timestamp (and last lexicographical name if needed) is selected

When duplicate pods are detected, a warning is logged to preserve visibility into the unexpected state.

Rationale

Although Spark driver pods are normally unique for a given task execution, multiple driver pods with identical labels can occur when the same task attempt is started again after an abrupt scheduler interruption.

In these cases, the original Spark driver pod is not cleaned up, and the SparkApplication may spawn a new driver pod that reuses the same labels. This results in multiple matching pods representing the same logical execution.

Failing fast in this scenario causes unnecessary task failures despite a recoverable state. Selecting a deterministic “most recent” pod allows Airflow to reattach to the correct driver while still surfacing the unexpected condition via warning logs.

Kubernetes does not guarantee label uniqueness (please refer to 'Labels and Selectors' here.) As a result, multiple pods matching the same label selector is a valid and documented Kubernetes state. The operator must therefore handle this case deterministically rather than failing with an exception.

Notes

  • A docstring has been added to def find_spark_job describing the function’s full behavior, including handling of duplicate pods.
  • The commented out condition and self.reattach_on_restart has been removed as it was misleading.

Tests

Added unit tests that:

  • Verify that def find_spark_job selects the most recently created Spark driver pod when multiple pods share identical labels.
  • Verify deterministic tie-breaking by pod name when multiple matching pods have the same creation_timestamp.

Backwards Compatibility

Previously, encountering multiple matching pods resulted in an AirflowException. This change replaces that failure with deterministic pod selection and a warning log.

The behavior when exactly one matching pod is found is unchanged. In the common case, def find_spark_job continues to return the single matching Spark driver pod exactly as before.

This change does not introduce any public API or configuration changes.

@boring-cyborg boring-cyborg bot added area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues labels Jan 17, 2026
@SameerMesiah97 SameerMesiah97 force-pushed the Spark-K8s-Multiple-Matching-Pods branch from 7da5381 to da7e9fb Compare January 17, 2026 16:41
@SameerMesiah97 SameerMesiah97 marked this pull request as draft January 17, 2026 17:48
@SameerMesiah97 SameerMesiah97 force-pushed the Spark-K8s-Multiple-Matching-Pods branch from da7e9fb to a1659a2 Compare January 17, 2026 18:15
@SameerMesiah97 SameerMesiah97 marked this pull request as ready for review January 17, 2026 18:21
@Nataneljpwd
Copy link
Contributor

Although Spark driver pods are normally unique for a given task execution, multiple driver pods with identical labels can occur when the same task attempt is started again after an abrupt scheduler or worker interruption.

As if an airflow scheduler interruption? As I don't see how this may happen, worker interruption seems more possible, however I don't see when it can happen, let's say for some reason it did exit, the pod is killed yet the driver stays up until it is finished, the task is marked as failed, if we have retries, won't it just create a new spark application and update the try number label?
Or if we are in a different dagrun, the runid will be different.

In these cases, the original Spark driver pod is not cleaned up, and the SparkApplication may spawn a new driver pod that reuses the same labels. This results in multiple matching pods representing the same logical execution.

The Spark application as if the CRD?
Doesn't it monitor the driver? Or basically deploy and manage the driver? I think I might be missing something, or misunderstanding something.

Failing fast in this scenario causes unnecessary task failures despite a recoverable state. Selecting a deterministic “most recent” pod allows Airflow to reattach to the correct driver while still surfacing the unexpected condition via warning logs.

What if I do want to fail the task? As if I am truncating a table which may not be an atomic operation, and I use airflow expecting there will be no more than 1 run, and so I want to be notified on failure.

Is there a way to reproduce the issue stated above?
And isn't there a case where the creation timestamp is the wrong pod? I.e the older retry pod was stuck in pending, so the older one got a newer created at timestamp, can this cause any issues?

@SameerMesiah97
Copy link
Contributor Author

SameerMesiah97 commented Jan 18, 2026

As if an airflow scheduler interruption? As I don't see how this may happen, worker interruption seems more possible, however I don't see when it can happen, let's say for some reason it did exit, the pod is killed yet the driver stays up until it is finished, the task is marked as failed, if we have retries, won't it just create a new spark application and update the try number label? Or if we are in a different dagrun, the runid will be different.

I admit this is a rare edge case. The scenario I am referring to is one where the scheduler might get OOM killed and this prevents the task state from being updated. So on scheduler restart, the task will resume from the same try number and look for an existing driver pod which has a label with the exact same context variables as before the crash.

The Spark application as if the CRD? Doesn't it monitor the driver? Or basically deploy and manage the driver? I think I might be missing something, or misunderstanding something.

Yes. The SparkApplication is the owner of the driver pod. The SparkKubernetesOpeator merely submits the SparkApplication, which then handles the lifecycle of the driver pod. I believe this would occur in a very narrow window between the submission of the SparkApplication and the execution of find_spark_job during which the aforementioned scheduler crash could occur. Consider the following timeline:

  1. Worker submits SparkApplication
  2. SparkApplication creates driver pod
  3. Scheduler crashes (e.g. OOM)
  4. SparkApplication later creates a second driver pod (with the same label)
  5. Scheduler restarts
  6. Worker runs find_spark_job
  7. find_spark_job sees two matching pods

What if I do want to fail the task? As if I am truncating a table which may not be an atomic operation, and I use airflow expecting there will be no more than 1 run, and so I want to be notified on failure.

That is a valid point but this is not about multiple task runs but failsafe pod recovery in a very specific scenario. And if you want to fail the task under certain conditions, using pod discovery as a trigger for task success or failure is brittle as there can be multiple pods per task. It would be better to use something else such as exit code or return value for the operation as that is more explicit. If we take your table truncation example, that would be the failure/success event fired by the DB. Also, under this new implementation, users will be notified when there are multiple pods. There just won't be a hard failure like before.

I do admit that I may have downplayed this in the PR description. This is a significant behavior change as mutliple pods may lead to soft recovery instead of hard failure. But that is because this does not indicate anything critically wrong within Airflow but the idiosyncracies with how a SparkApplication handles pod reconciliation and creation.

Is there a way to reproduce the issue stated above?

It's possible but very difficult to do so deterministically. If you look at the existing condition, it appears to imply that multiple pods with the same label being picked up is a plausible scenario. This is what initially motivated me to produce this PR.

And isn't there a case where the creation timestamp is the wrong pod? I.e the older retry pod was stuck in pending, so the older one got a newer created at timestamp, can this cause any issues?

I agree. I think the approach needs to be modified so that it prioritizes running pods with creation_timestamp being a tie-breaker. This is a very valid edge case that I overlooked.

@SameerMesiah97
Copy link
Contributor Author

Requesting review for this.

@Nataneljpwd
Copy link
Contributor

Yes. The SparkApplication is the owner of the driver pod. The SparkKubernetesOpeator merely submits the SparkApplication, which then handles the lifecycle of the driver pod. I believe this would occur in a very narrow window between the submission of the SparkApplication and the execution of find_spark_job during which the aforementioned scheduler crash could occur. Consider the following timeline:

  1. Worker submits SparkApplication
  2. SparkApplication creates driver pod
  3. Scheduler crashes (e.g. OOM)
  4. SparkApplication later creates a second driver pod (with the same label)
  5. Scheduler restarts
  6. Worker runs find_spark_job
  7. find_spark_job sees two matching pods

How can a spark application create another driver pod?
I do not see how it happens without the driver pod failing, in which case we can just select the pod by status.

And isn't there a case where the creation timestamp is the wrong pod? I.e the older retry pod was stuck in pending, so the older one got a newer created at timestamp, can this cause any issues?

I agree. I think the approach needs to be modified so that it prioritizes running pods with creation_timestamp being a tie-breaker. This is a very valid edge case that I overlooked.

Have you implemented this? As I did not see it, overall the pr looks good, I will look at it soon, though I would appreciate if you could answer my questions.
Thank you

@SameerMesiah97
Copy link
Contributor Author

How can a spark application create another driver pod? I do not see how it happens without the driver pod failing, in which case we can just select the pod by status.

You are correct here. The pod may be recreated only if the driver pod fails. This still does not prevent 2 pods with the same label existing at the same time.

@SameerMesiah97 SameerMesiah97 force-pushed the Spark-K8s-Multiple-Matching-Pods branch from a1659a2 to 177f45c Compare January 26, 2026 19:58
@SameerMesiah97 SameerMesiah97 force-pushed the Spark-K8s-Multiple-Matching-Pods branch from 177f45c to c4695b9 Compare January 26, 2026 20:05
@Nataneljpwd
Copy link
Contributor

Nataneljpwd commented Jan 26, 2026

How can a spark application create another driver pod? I do not see how it happens without the driver pod failing, in which case we can just select the pod by status.

You are correct here. The pod may be recreated only if the driver pod fails. This still does not prevent 2 pods with the same label existing at the same time.

If the driver pod fails, why can't we check the status in the query instead of doing a fallback?
I see you are checking for this, and it seems to be pretty deterministic, we cannot have 2 running driver pods in this case, so it seems like this cannot happen, as no 2 driver pods will be running for the same SparkApplication, that is something k8s promises.
The point is that you added the check for running, which is already deterministic, and so you may remove some code as there will never be more than 1 pod in running with the same labels for the same SparkApplication
Maybe I am missing something but this seems to be the better and less error-prone solution

@SameerMesiah97
Copy link
Contributor Author

If the driver pod fails, why can't we check the status in the query instead of doing a fallback? I see you are checking for this, and it seems to be pretty deterministic, we cannot have 2 running driver pods in this case, so it seems like this cannot happen, as no 2 driver pods will be running for the same SparkApplication, that is something k8s promises. The point is that you added the check for running, which is already deterministic, and so you may remove some code as there will never be more than 1 pod in running with the same labels for the same SparkApplication Maybe I am missing something but this seems to be the better and less error-prone solution

I agree that under normal operation, there should be at most one active driver pod at a time.

But the reason for keeping the fallback ordering is that this code operates on the Kubernetes API response rather than the SparkApplication controller’s intent. In abnormal states (e.g. termination delays or stale API observations), multiple pods can still be returned and phase alone might not be enough to pick up the correct pod. That being said, if you feel the additional ordering is unnecessary, I can remove it.

@Nataneljpwd
Copy link
Contributor

If the driver pod fails, why can't we check the status in the query instead of doing a fallback? I see you are checking for this, and it seems to be pretty deterministic, we cannot have 2 running driver pods in this case, so it seems like this cannot happen, as no 2 driver pods will be running for the same SparkApplication, that is something k8s promises. The point is that you added the check for running, which is already deterministic, and so you may remove some code as there will never be more than 1 pod in running with the same labels for the same SparkApplication Maybe I am missing something but this seems to be the better and less error-prone solution

I agree that under normal operation, there should be at most one active driver pod at a time.

But the reason for keeping the fallback ordering is that this code operates on the Kubernetes API response rather than the SparkApplication controller’s intent. In abnormal states (e.g. termination delays or stale API observations), multiple pods can still be returned and phase alone might not be enough to pick up the correct pod. That being said, if you feel the additional ordering is unnecessary, I can remove it.

Well, I just think that because this call does not happen a lot, we can take the hit with a full quorum request in k8s by not setting the resource version, forcing a full quorum read on k8s, which guarantees no stale data, and so making it a full quorum read will not be too heavy as this operation rarely happens (only when a worker is for some reason killed, which is not the happy flow of a worker).

Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a valid corner case and the fix helps in such situations.

What I am thinking of (maybe a follow-up PR?) to log the other Pods and ... who would clean the other Pods? Might an automatic duplicate cleaning be meaningful?

@jscheffl jscheffl merged commit aee8964 into apache:main Jan 26, 2026
104 checks passed
shreyas-dev pushed a commit to shreyas-dev/airflow that referenced this pull request Jan 29, 2026
…d unit tests. (apache#60717)

Co-authored-by: Sameer Mesiah <smesiah971@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants