-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
2.7.3
What happened
Hello folks,
I have a use case that doesn't work anymore after the release of the new version 2.7.3.
I have done some investigation and found that is related to this PR as I have reverted this locally and run some tests it worked as expected.
Please find below how to reproduce the issue.
Cc @ephraimbuddy
Thanks.
What you think should happen instead
No response
How to reproduce
Below is the DAG code:
def dag():
@task
def make_list():
return [4, 42, 2]
@task
def double(n):
if n == 42:
raise AirflowSkipException("42")
return n*2
@task
def last(n):
print(n)
@task_group
def group(n: int) -> None:
last(double(n))
list = make_list()
group.expand(n=list)
dag()
Running this DAG with the 2.7.3 version, will give you the result below:
As you can see, the status of the last task is skipped and the map index 0 and 1 was not launched.
You will obtain the same issue with AirflowFailException instead of AirflowSkipException and the status of the last task will be failed.
In the other hand, if you run the same DAG with the 2.7.2 version (or without the changes in the provided PR above), you will get the result below which is the correct one:
Operating System
linux
Versions of Apache Airflow Providers
No response
Deployment
Docker-Compose
Deployment details
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct