-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Make Spark driver reattachment deterministic when multiple pods match #60717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make Spark driver reattachment deterministic when multiple pods match #60717
Conversation
7da5381 to
da7e9fb
Compare
da7e9fb to
a1659a2
Compare
As if an airflow scheduler interruption? As I don't see how this may happen, worker interruption seems more possible, however I don't see when it can happen, let's say for some reason it did exit, the pod is killed yet the driver stays up until it is finished, the task is marked as failed, if we have retries, won't it just create a new spark application and update the try number label?
The Spark application as if the CRD?
What if I do want to fail the task? As if I am truncating a table which may not be an atomic operation, and I use airflow expecting there will be no more than 1 run, and so I want to be notified on failure. Is there a way to reproduce the issue stated above? |
I admit this is a rare edge case. The scenario I am referring to is one where the scheduler might get OOM killed and this prevents the task state from being updated. So on scheduler restart, the task will resume from the same try number and look for an existing driver pod which has a label with the exact same context variables as before the crash.
Yes. The SparkApplication is the owner of the driver pod. The
That is a valid point but this is not about multiple task runs but failsafe pod recovery in a very specific scenario. And if you want to fail the task under certain conditions, using pod discovery as a trigger for task success or failure is brittle as there can be multiple pods per task. It would be better to use something else such as exit code or return value for the operation as that is more explicit. If we take your table truncation example, that would be the failure/success event fired by the DB. Also, under this new implementation, users will be notified when there are multiple pods. There just won't be a hard failure like before. I do admit that I may have downplayed this in the PR description. This is a significant behavior change as mutliple pods may lead to soft recovery instead of hard failure. But that is because this does not indicate anything critically wrong within Airflow but the idiosyncracies with how a SparkApplication handles pod reconciliation and creation.
It's possible but very difficult to do so deterministically. If you look at the existing condition, it appears to imply that multiple pods with the same label being picked up is a plausible scenario. This is what initially motivated me to produce this PR.
I agree. I think the approach needs to be modified so that it prioritizes running pods with |
|
Requesting review for this. |
How can a spark application create another driver pod?
Have you implemented this? As I did not see it, overall the pr looks good, I will look at it soon, though I would appreciate if you could answer my questions. |
You are correct here. The pod may be recreated only if the driver pod fails. This still does not prevent 2 pods with the same label existing at the same time. |
a1659a2 to
177f45c
Compare
177f45c to
c4695b9
Compare
If the driver pod fails, why can't we check the status in the query instead of doing a fallback? |
I agree that under normal operation, there should be at most one active driver pod at a time. But the reason for keeping the fallback ordering is that this code operates on the Kubernetes API response rather than the SparkApplication controller’s intent. In abnormal states (e.g. termination delays or stale API observations), multiple pods can still be returned and phase alone might not be enough to pick up the correct pod. That being said, if you feel the additional ordering is unnecessary, I can remove it. |
Well, I just think that because this call does not happen a lot, we can take the hit with a full quorum request in k8s by not setting the resource version, forcing a full quorum read on k8s, which guarantees no stale data, and so making it a full quorum read will not be too heavy as this operation rarely happens (only when a worker is for some reason killed, which is not the happy flow of a worker). |
jscheffl
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is a valid corner case and the fix helps in such situations.
What I am thinking of (maybe a follow-up PR?) to log the other Pods and ... who would clean the other Pods? Might an automatic duplicate cleaning be meaningful?
…d unit tests. (apache#60717) Co-authored-by: Sameer Mesiah <smesiah971@gmail.com>
Description
This change makes
def find_spark_jobdeterministic when multiple Spark driver pods match the same task labels.Previously, the operator raised an exception if more than one matching pod was found. Instead, the operator now deterministically selects a single pod for reattachment using a stable ordering strategy:
creation_timestampWhen duplicate pods are detected, a warning is logged to preserve visibility into the unexpected state.
Rationale
Although Spark driver pods are normally unique for a given task execution, multiple driver pods with identical labels can occur when the same task attempt is started again after an abrupt scheduler interruption.
In these cases, the original Spark driver pod is not cleaned up, and the SparkApplication may spawn a new driver pod that reuses the same labels. This results in multiple matching pods representing the same logical execution.
Failing fast in this scenario causes unnecessary task failures despite a recoverable state. Selecting a deterministic “most recent” pod allows Airflow to reattach to the correct driver while still surfacing the unexpected condition via warning logs.
Kubernetes does not guarantee label uniqueness (please refer to 'Labels and Selectors' here.) As a result, multiple pods matching the same label selector is a valid and documented Kubernetes state. The operator must therefore handle this case deterministically rather than failing with an exception.
Notes
def find_spark_jobdescribing the function’s full behavior, including handling of duplicate pods.and self.reattach_on_restarthas been removed as it was misleading.Tests
Added unit tests that:
def find_spark_jobselects the most recently created Spark driver pod when multiple pods share identical labels.creation_timestamp.Backwards Compatibility
Previously, encountering multiple matching pods resulted in an
AirflowException. This change replaces that failure with deterministic pod selection and a warning log.The behavior when exactly one matching pod is found is unchanged. In the common case,
def find_spark_jobcontinues to return the single matching Spark driver pod exactly as before.This change does not introduce any public API or configuration changes.