-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Replace State by TaskInstanceState in Airflow executors #32627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
uranusjr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’ve been avoiding these modules since KubernetesExecutor currently pass the pod status (which is a raw string) directly into _change_state and reuse it as ti state. Preventing merge until it is resolved.
I planned to tackle this later, but feel free to give it a try if you have time before I do.
I will try to do that |
…ubernetesWatchType to TaskInstanceState
|
@uranusjr could you check it now? I'm not sure about this change in base executor: - def change_state(self, key: TaskInstanceKey, state: str, info=None) -> None:
+ def change_state(self, key: TaskInstanceKey, state: TaskInstanceState, info=None) -> None:Is it a breaking change? If so, I can change the signature in the base executor (only base executor) to make it
|
uranusjr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be good. The TaskInstanceState is downcasted to str when it goes into self.event_buffer, which is unfortunate, but at least this tightens the signature in the public interface.
|
I added a commit to fix one missed |
eladkal
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this PR backward compatible?
Once we have future release of celery/k8s provider would it work with base executor before and after this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this PR backward compatible?
Once we have future release of celery/k8s provider would it work with base executor before and after this change?
I think it's a very valid point. It's not IMHO. We've already releeased executors in providers, and at the moment we did - the cat is out of the bag and we should be very careful on changing the API, This is one of the consequences of extracting out things. Executor's base_executor is officially part of the Publlic Interface, so we should keep compatibility.
Not sure how we could implement such change in backwards-compatible way.
Are you sure this backwards compatible @uranusjr ? the This would fail after that change. The public API has changed. The After seeing how easy it is to miss the change I am going in fact to implement similar protection for the public API as we have for common.sql - so that whenever public API changes, we have a failure of CI. But in the meantime - I am not sure we have an easy way to solve that, other than keep |
|
It is since >>> from airflow.utils.state import State, TaskInstanceState
>>> State.FAILED == "failed"
True
>>> TaskInstanceState.FAILED == "failed"
True
>>> TaskInstanceState.FAILED == State.FAILED
True |
Yeah. You are right. Sorry. Indeed. |
potiuk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, yes. It should be back-compatible.
|
cool so lets merge |
|
Actually when we merge it to 2.7.0, it's even better - even if the interface changes a bit (being compatble as @uranusjr laid out) - the 2.7.0 is the first relase where executors are the ones that are "really" decoupled. cc: @o-nikolas - you also might want to have a look and see if you see any dangers here. |
@potiuk The above convo checks out to me, retro approved 😄 |
* Replace State by TaskInstanceState in Airflow executors * chaneg state type in change_state method, KubernetesResultsType and KubernetesWatchType to TaskInstanceState * Fix change_state annotation in CeleryExecutor --------- Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com> (cherry picked from commit 9556d6d)
Use
TaskInstanceStateinstead ofStatein Airflow executors when we want to access the state enum value for TI.^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.