Skip to content

Conversation

@hussein-awala
Copy link
Member

Use TaskInstanceState instead of State in Airflow executors when we want to access the state enum value for TI.


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added provider:cncf-kubernetes Kubernetes (k8s) provider related issues area:Scheduler including HA (high availability) scheduler labels Jul 15, 2023
@hussein-awala hussein-awala added the type:misc/internal Changelog: Misc changes that should appear in change log label Jul 15, 2023
@hussein-awala hussein-awala added this to the Airflow 2.6.4 milestone Jul 15, 2023
Copy link
Member

@uranusjr uranusjr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve been avoiding these modules since KubernetesExecutor currently pass the pod status (which is a raw string) directly into _change_state and reuse it as ti state. Preventing merge until it is resolved.

I planned to tackle this later, but feel free to give it a try if you have time before I do.

@potiuk potiuk requested a review from uranusjr July 17, 2023 18:09
@hussein-awala
Copy link
Member Author

I’ve been avoiding these modules since KubernetesExecutor currently pass the pod status (which is a raw string) directly into _change_state and reuse it as ti state. Preventing merge until it is resolved.
I planned to tackle this later, but feel free to give it a try if you have time before I do.

I will try to do that

@hussein-awala
Copy link
Member Author

@uranusjr could you check it now?

I'm not sure about this change in base executor:

-    def change_state(self, key: TaskInstanceKey, state: str, info=None) -> None:
+    def change_state(self, key: TaskInstanceKey, state: TaskInstanceState, info=None) -> None:

Is it a breaking change? If so, I can change the signature in the base executor (only base executor) to make it TaskInstanceState | str, and convert it to TaskInstanceState if it's str. WDYT?

The executor interface itself (the BaseExecutor class) is public, but the built-in executors are not (i.e. KubernetesExecutor, LocalExecutor, etc).

Copy link
Member

@uranusjr uranusjr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be good. The TaskInstanceState is downcasted to str when it goes into self.event_buffer, which is unfortunate, but at least this tightens the signature in the public interface.

@uranusjr
Copy link
Member

uranusjr commented Aug 8, 2023

I added a commit to fix one missed change_state signature change.

Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this PR backward compatible?

Once we have future release of celery/k8s provider would it work with base executor before and after this change?

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this PR backward compatible?

Once we have future release of celery/k8s provider would it work with base executor before and after this change?

I think it's a very valid point. It's not IMHO. We've already releeased executors in providers, and at the moment we did - the cat is out of the bag and we should be very careful on changing the API, This is one of the consequences of extracting out things. Executor's base_executor is officially part of the Publlic Interface, so we should keep compatibility.

Not sure how we could implement such change in backwards-compatible way.

@potiuk
Copy link
Member

potiuk commented Aug 8, 2023

This should be good. The TaskInstanceState is downcasted to str when it goes into self.event_buffer, which is unfortunate, but at least this tightens the signature in the public interface.

Are you sure this backwards compatible @uranusjr ? the change_state() and friends signature change is particularly worrying. Imagine someone writing an executor and handle change_state(key, state):

if state == State.FAILED:
   do_some_processing

This would fail after that change. The public API has changed. The change_state is a public API of BaseExecutor and it is supposed to be overridden by someone implementing custom executor. In fact even our community executors do (by chance they do not use state parameter - but they could have, and other custom executors could have used it too.

After seeing how easy it is to miss the change I am going in fact to implement similar protection for the public API as we have for common.sql - so that whenever public API changes, we have a failure of CI. But in the meantime - I am not sure we have an easy way to solve that, other than keep change_state using State enum and maybe just convert all the TaskInstance.* to State.* in the code.

@uranusjr
Copy link
Member

uranusjr commented Aug 8, 2023

It is since State.* values are identical to TaskInstanceState.* values, and they all automatically downcast to str so equality comparisons would automatically work. I asusme those enums were designed specifically with this compatibility consideration in mind.

>>> from airflow.utils.state import State, TaskInstanceState
>>> State.FAILED == "failed"
True
>>> TaskInstanceState.FAILED == "failed"
True
>>> TaskInstanceState.FAILED == State.FAILED
True

@potiuk
Copy link
Member

potiuk commented Aug 8, 2023

It is since State.* values are identical to TaskInstanceState.* values, and they all automatically downcast to str so equality comparisons would automatically work. I asusme those enums were designed specifically with this compatibility consideration in mind.

>>> from airflow.utils.state import State, TaskInstanceState
>>> State.FAILED == "failed"
True
>>> TaskInstanceState.FAILED == "failed"
True
>>> TaskInstanceState.FAILED == State.FAILED
True

Yeah. You are right. Sorry. Indeed.

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, yes. It should be back-compatible.

@eladkal
Copy link
Contributor

eladkal commented Aug 8, 2023

cool so lets merge

@eladkal eladkal merged commit 9556d6d into apache:main Aug 8, 2023
@potiuk
Copy link
Member

potiuk commented Aug 8, 2023

Actually when we merge it to 2.7.0, it's even better - even if the interface changes a bit (being compatble as @uranusjr laid out) - the 2.7.0 is the first relase where executors are the ones that are "really" decoupled. cc: @o-nikolas - you also might want to have a look and see if you see any dangers here.

@o-nikolas
Copy link
Contributor

cc: @o-nikolas - you also might want to have a look and see if you see any dangers here.

@potiuk The above convo checks out to me, retro approved 😄

ephraimbuddy pushed a commit that referenced this pull request Aug 8, 2023
* Replace State by TaskInstanceState in Airflow executors

* chaneg state type in change_state method, KubernetesResultsType and KubernetesWatchType to TaskInstanceState

* Fix change_state annotation in CeleryExecutor

---------

Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
(cherry picked from commit 9556d6d)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler provider:cncf-kubernetes Kubernetes (k8s) provider related issues type:misc/internal Changelog: Misc changes that should appear in change log

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants