Skip to content

Conversation

@johnhoran
Copy link
Contributor


^ Add meaningful description above
closes #41867

This is to fix a bug in the Kubernetes pod operator where if the task wakes up from a deferred state to collect logs. If the task completes during that time then operator won't defer it again, however it also doesn't evaluate the reason for the completion, so if the pod ends in an error, it will still be marked as successful.

The fix here is to always defer when the task resumes while the pod is in a running state, so that the next resume will notice the pod exit and act accordingly, however alternatively we could check the pod exit state and skip the deferment. I'm happy to amend the PR if this is preferred.

@boring-cyborg boring-cyborg bot added area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues labels Aug 29, 2024
@boring-cyborg
Copy link

boring-cyborg bot commented Aug 29, 2024

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@kaxil kaxil requested a review from dstandish August 29, 2024 20:15
@dstandish
Copy link
Contributor

Yeah I think it's best to avoid the unnecessariy defer / resume cycle. seems you could refactor the logic under the if event["status"] in ("error", "failed", "timeout") block into a method and invoke it in the case you describe. Or something like that. But, wasteful to go through a another defer-resume cycle.

@johnhoran
Copy link
Contributor Author

@dstandish thanks, I've refactored the code slightly so it will run the success or failure section if the pod completes without deferring again.

@johnhoran johnhoran changed the title Kubernetes Pod Operator: always invoke defer if status was running Kubernetes Pod Operator: check pod completion reason if completed during deffered logging interval Aug 30, 2024
@RNHTTR
Copy link
Contributor

RNHTTR commented Aug 30, 2024

@johnhoran Can you please fix the failing tests?

@johnhoran
Copy link
Contributor Author

Can you please fix the failing tests?

@RNHTTR all resolved now, I've also added a test case which would have failed if the failure status wasn't handled.

Comment on lines 391 to 394
self.skip_on_exit_code = (
skip_on_exit_code
if isinstance(skip_on_exit_code, Container)
else [skip_on_exit_code]
if skip_on_exit_code is not None
else []
else [skip_on_exit_code] if skip_on_exit_code is not None else []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to suggest this instead:

    self.skip_on_exit_code = None
    if isinstance(skip_on_exit_code, Container):
        self.skip_on_exit_code = skip_on_exit_code
    elif skip_on_exit_code:
        self.skip_on_exit_code = skip_on_exit_code

why:
The multiple if elses in the comprehension are hard to follow. And, I think it's nice to use None when there's no value (instead of empty list) and I looked at where this is used and it seems it should work the same.
I think it's nice to use None instead of empty list .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've amended it. FYI this was just a change from running black.

self.log.info("Container still running; deferring again.")
self.invoke_defer_method(pod_log_status.last_log_time)
else:
event = event.copy()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't really like the idea that we're changing the event. the event is what came from the trigger. there's not really a need to mess with it. yes we have checked if the pod status has changed. but we don't need to use the event to communicate this. the event can be left alone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok so, here's a suggestion.

what we're dealing with is the situation where the pod happens to complete after the trigger defers and before the task finishes.

this is a sort of less common scenario. so, rather than change the PodLoggingStatus object, and modify the event object just read the pod again, and handle accordingly.

e.g. add read pod here and evaluate success:

if pod is succeeded, do success logic; if pod failed, do failed logic. i guess you don't even need to pull the success / failure logic out into methods. you just need to move it down. you now have two ways to discern success or failure. usually it will come from event; but if event data is stale it will come from the pod read.

Copy link
Contributor

@dstandish dstandish Sep 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g. something like

succeeded = False
if event == running blah:
    ... other logic
    if pod_log_status.running:
        self.log.info("Container still running; deferring again.")
        self.invoke_defer_method(pod_log_status.last_log_time)
    else:  # now we know pod status is different; let's read the pod again.
        pod = self.read_pod(...)
        succeeded = container_is_succeeded(...)  # <-- bam now you know to do failure logic even though event is success

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I've extracted out the pod_status from event so the event doesn't get modifed

grab the latest logs and defer back to the trigger again.
"""
self.pod = None
pod_status = event["status"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok it's subtle but you are still conflating the event "status" with the pod status. i recommend keeping them separate. you can either consider each independently (e.g. if pod_status is this, elif event_status is this etc) or you can add a new variable that essentially aggregates the pod status and event status. you can see why this matters if you look at line 787. there's no pod status "timeout". so this could be confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I've kept them entirely separate.

@eladkal eladkal requested a review from dstandish October 3, 2024 16:17
@johnhoran
Copy link
Contributor Author

Looks like this was resolved by #42815

@johnhoran johnhoran closed this Oct 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Kubernetes Pod Operator: race condition when using deferrable and logging_interval

3 participants