Skip to content

Conversation

@pierrejeambrun
Copy link
Member

@pierrejeambrun pierrejeambrun commented Oct 21, 2025

Fixes: #55899
Closes #56321

Similarly to https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561, we need an exit condition if the taskgroup is found in usorted graph.

Adjusted test, which indeed were not in the correct topological order.

Testing dag code:

from __future__ import annotations

import datetime

import pendulum

from airflow.sdk import dag, task, task_group


@task
def get_nums() -> list[int]:
    return [1, 2, 4]


@task
def times_2(n: int) -> int:
    return n * 2


@task_group(group_id="process_number")
def process_number(n: int):
    value = times_2(n)
    return value


@task
def log_success() -> None:
    print("Processed successful!")


@dag(
    schedule=None,
    catchup=False,
    start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"),
    dagrun_timeout=datetime.timedelta(minutes=30),
    dag_id="55899_bug",
)
def test():
    nums = get_nums()
    processed = process_number.expand(n=nums)
    processed >> log_success()


test()

Before

Screenshot 2025-10-21 at 17 57 20

After

Screenshot 2025-10-21 at 17 56 57

@pierrejeambrun pierrejeambrun added this to the Airflow 3.1.1 milestone Oct 21, 2025
@pierrejeambrun pierrejeambrun self-assigned this Oct 21, 2025
@pierrejeambrun pierrejeambrun added the backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch label Oct 21, 2025
@dheerajturaga
Copy link
Member

I tried to fix this in #56321
however seems like your implementation is simpler. let me know if I need to close mine

@pierrejeambrun
Copy link
Member Author

pierrejeambrun commented Oct 21, 2025

Yes, I Think this implementation is simpler and should be favored, also it is more in lined with the sdk implementation

@pierrejeambrun
Copy link
Member Author

Cc: @kaxil

@dheerajturaga
Copy link
Member

Yes, I Think this implementation is simpler and should be favored, also it is more in lined with the sdk implementation

Agreed! Thanks for fixing this!

@kaxil kaxil merged commit c3f53b1 into apache:main Oct 21, 2025
114 checks passed
@kaxil kaxil deleted the fix-topological-sort-odering branch October 21, 2025 17:27
github-actions bot pushed a commit that referenced this pull request Oct 21, 2025
Fixes: #55899
Closes #56321

Similarly to https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561, we need an exit condition if the taskgroup is found in usorted graph.

Adjusted test, which indeed were not in the correct topological order.

Testing dag code:
```python
from __future__ import annotations

import datetime

import pendulum

from airflow.sdk import dag, task, task_group

@task
def get_nums() -> list[int]:
    return [1, 2, 4]

@task
def times_2(n: int) -> int:
    return n * 2

@task_group(group_id="process_number")
def process_number(n: int):
    value = times_2(n)
    return value

@task
def log_success() -> None:
    print("Processed successful!")

@dag(
    schedule=None,
    catchup=False,
    start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"),
    dagrun_timeout=datetime.timedelta(minutes=30),
    dag_id="55899_bug",
)
def test():
    nums = get_nums()
    processed = process_number.expand(n=nums)
    processed >> log_success()

test()
```

### Before
<img width="1917" height="1016" alt="Screenshot 2025-10-21 at 17 57 20" src="https://github.com/user-attachments/assets/d5220b87-a23b-40f7-8ecf-cb1b39d72f53" />

### After
<img width="1923" height="937" alt="Screenshot 2025-10-21 at 17 56 57" src="https://github.com/user-attachments/assets/37f75b19-2e80-4765-b9cb-e425f9054b78" />
(cherry picked from commit c3f53b1)

Co-authored-by: Pierre Jeambrun <pierrejbrun@gmail.com>
@github-actions
Copy link

Backport successfully created: v3-1-test

Status Branch Result
v3-1-test PR Link

kaxil pushed a commit that referenced this pull request Oct 21, 2025
Fixes: #55899
Closes #56321

Similarly to https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561, we need an exit condition if the taskgroup is found in usorted graph.

Adjusted test, which indeed were not in the correct topological order.

Testing dag code:
```python
from __future__ import annotations

import datetime

import pendulum

from airflow.sdk import dag, task, task_group

@task
def get_nums() -> list[int]:
    return [1, 2, 4]

@task
def times_2(n: int) -> int:
    return n * 2

@task_group(group_id="process_number")
def process_number(n: int):
    value = times_2(n)
    return value

@task
def log_success() -> None:
    print("Processed successful!")

@dag(
    schedule=None,
    catchup=False,
    start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"),
    dagrun_timeout=datetime.timedelta(minutes=30),
    dag_id="55899_bug",
)
def test():
    nums = get_nums()
    processed = process_number.expand(n=nums)
    processed >> log_success()

test()
```

### Before
<img width="1917" height="1016" alt="Screenshot 2025-10-21 at 17 57 20" src="https://github.com/user-attachments/assets/d5220b87-a23b-40f7-8ecf-cb1b39d72f53" />

### After
<img width="1923" height="937" alt="Screenshot 2025-10-21 at 17 56 57" src="https://github.com/user-attachments/assets/37f75b19-2e80-4765-b9cb-e425f9054b78" />

(cherry picked from commit c3f53b1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:serialization backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Order tasks topologically in Grid view

3 participants