Skip to content

Conversation

@dheerajturaga
Copy link
Member

The SerializedTaskGroup.topological_sort() method had a critical bug in its
topological sorting algorithm. After checking if an upstream dependency's parent
task group was still in the unsorted graph, the code failed to verify whether
such a parent was found before proceeding. This caused the else clause to execute
even when nodes had unresolved parent task group dependencies, resulting in tasks
being sorted out of dependency order in the grid view.

The fix adds the missing logic:

  1. Check if a parent task group dependency exists (if tg:) and break if found
  2. Track progress with an acyclic flag to detect cycles or stuck states
  3. Break the loop if no nodes are resolved in an iteration

Also added the missing hierarchical_alphabetical_sort() method to support the
alternative grid_view_sorting_order configuration option.

This ensures tasks are displayed in the correct dependency order in the grid view,
matching how they are executed.

Before:

tg_sort_graph tg_sort_grid_view_bad

After:

image

@dheerajturaga
Copy link
Member Author

@ashb , Im not sure if the CI check fails are related to my change...

@ashb
Copy link
Member

ashb commented Oct 2, 2025

Did this change in 3.1?

The test failures look unrelated -- try rebasing to see if that fixes it

@dheerajturaga
Copy link
Member Author

Did this change in 3.1?

The test failures look unrelated -- try rebasing to see if that fixes it

Looks like the task ordering changed between 3.0.6 and 3.1
Even the unit tests seem to have been updated with the incorrect task order. I had to fix the unit tests aswell.

Reg test failures, I already rebased, I suspect CI is broken at the moment

@dheerajturaga
Copy link
Member Author

issue is stemming from #55169
cc: @uranusjr

@kaxil
Copy link
Member

kaxil commented Oct 2, 2025

issue is stemming from #55169

cc: @uranusjr

#55169 (comment) bit us?

@dheerajturaga dheerajturaga force-pushed the bugfix/grid-view-task-ordering branch from 0befe88 to 0263a87 Compare October 2, 2025 14:06
@ashb ashb added this to the Airflow 3.1.1 milestone Oct 3, 2025
@ashb ashb added kind:bug This is a clearly a bug type:bug-fix Changelog: Bug Fixes labels Oct 3, 2025
…tion

  The SerializedTaskGroup.topological_sort() method had a critical bug in its
  topological sorting algorithm. After checking if an upstream dependency's parent
  task group was still in the unsorted graph, the code failed to verify whether
  such a parent was found before proceeding. This caused the else clause to execute
  even when nodes had unresolved parent task group dependencies, resulting in tasks
  being sorted out of dependency order in the grid view.

  The fix adds the missing logic:
  1. Check if a parent task group dependency exists (if tg:) and break if found
  2. Track progress with an acyclic flag to detect cycles or stuck states
  3. Break the loop if no nodes are resolved in an iteration

  Also added the missing hierarchical_alphabetical_sort() method to support the
  alternative grid_view_sorting_order configuration option.

  This ensures tasks are displayed in the correct dependency order in the grid view,
  matching how they are executed.
…tion

  The SerializedTaskGroup.topological_sort() method had critical bugs that caused
  tasks to display in incorrect order in the grid view:

  1. Missing logic to check if parent task group dependencies exist before adding
     nodes to the sorted list, causing premature sorting of dependent tasks.

  2. Failed to handle task groups differently from tasks when checking upstream
     dependencies. Task groups use upstream_group_ids/upstream_task_ids attributes,
     while tasks use upstream_list. The original implementation only checked
     upstream_list, causing task groups to appear to have no dependencies.

  The fix:
  - Added missing hierarchical_alphabetical_sort() method to support the alternative
    grid_view_sorting_order configuration option
  - Fixed topological_sort() to properly detect upstream dependencies for both tasks
    (via upstream_list) and task groups (via upstream_group_ids/upstream_task_ids)
  - Added check after the parent task group search loop to break if a dependency
    was found
  - Added acyclic flag tracking and handling for cycle/stuck states

  Updated unit tests to reflect correct topological ordering where tasks appear
  after their dependencies rather than in arbitrary order.

  This ensures tasks are displayed in correct dependency order in the grid view,
  matching how they are executed.
@dheerajturaga dheerajturaga force-pushed the bugfix/grid-view-task-ordering branch from 0263a87 to a99c496 Compare October 5, 2025 21:22
Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@kaxil kaxil requested a review from uranusjr October 13, 2025 19:35
Copy link
Member

@pierrejeambrun pierrejeambrun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in the other PR, I think #56963 should be favored, the implementation is simpler and more consistent with the sdk counterpart

@kaxil kaxil closed this in #56963 Oct 21, 2025
github-actions bot pushed a commit that referenced this pull request Oct 21, 2025
Fixes: #55899
Closes #56321

Similarly to https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561, we need an exit condition if the taskgroup is found in usorted graph.

Adjusted test, which indeed were not in the correct topological order.

Testing dag code:
```python
from __future__ import annotations

import datetime

import pendulum

from airflow.sdk import dag, task, task_group

@task
def get_nums() -> list[int]:
    return [1, 2, 4]

@task
def times_2(n: int) -> int:
    return n * 2

@task_group(group_id="process_number")
def process_number(n: int):
    value = times_2(n)
    return value

@task
def log_success() -> None:
    print("Processed successful!")

@dag(
    schedule=None,
    catchup=False,
    start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"),
    dagrun_timeout=datetime.timedelta(minutes=30),
    dag_id="55899_bug",
)
def test():
    nums = get_nums()
    processed = process_number.expand(n=nums)
    processed >> log_success()

test()
```

### Before
<img width="1917" height="1016" alt="Screenshot 2025-10-21 at 17 57 20" src="https://github.com/user-attachments/assets/d5220b87-a23b-40f7-8ecf-cb1b39d72f53" />

### After
<img width="1923" height="937" alt="Screenshot 2025-10-21 at 17 56 57" src="https://github.com/user-attachments/assets/37f75b19-2e80-4765-b9cb-e425f9054b78" />
(cherry picked from commit c3f53b1)

Co-authored-by: Pierre Jeambrun <pierrejbrun@gmail.com>
kaxil pushed a commit to astronomer/airflow that referenced this pull request Oct 21, 2025
Fixes: apache#55899
Closes apache#56321

Similarly to https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561, we need an exit condition if the taskgroup is found in usorted graph.

Adjusted test, which indeed were not in the correct topological order.

Testing dag code:
```python
from __future__ import annotations

import datetime

import pendulum

from airflow.sdk import dag, task, task_group


@task
def get_nums() -> list[int]:
    return [1, 2, 4]


@task
def times_2(n: int) -> int:
    return n * 2


@task_group(group_id="process_number")
def process_number(n: int):
    value = times_2(n)
    return value


@task
def log_success() -> None:
    print("Processed successful!")


@dag(
    schedule=None,
    catchup=False,
    start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"),
    dagrun_timeout=datetime.timedelta(minutes=30),
    dag_id="55899_bug",
)
def test():
    nums = get_nums()
    processed = process_number.expand(n=nums)
    processed >> log_success()


test()
```


### Before
<img width="1917" height="1016" alt="Screenshot 2025-10-21 at 17 57 20" src="https://github.com/user-attachments/assets/d5220b87-a23b-40f7-8ecf-cb1b39d72f53" />


### After 
<img width="1923" height="937" alt="Screenshot 2025-10-21 at 17 56 57" src="https://github.com/user-attachments/assets/37f75b19-2e80-4765-b9cb-e425f9054b78" />
kaxil pushed a commit that referenced this pull request Oct 21, 2025
Fixes: #55899
Closes #56321

Similarly to https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561, we need an exit condition if the taskgroup is found in usorted graph.

Adjusted test, which indeed were not in the correct topological order.

Testing dag code:
```python
from __future__ import annotations

import datetime

import pendulum

from airflow.sdk import dag, task, task_group

@task
def get_nums() -> list[int]:
    return [1, 2, 4]

@task
def times_2(n: int) -> int:
    return n * 2

@task_group(group_id="process_number")
def process_number(n: int):
    value = times_2(n)
    return value

@task
def log_success() -> None:
    print("Processed successful!")

@dag(
    schedule=None,
    catchup=False,
    start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"),
    dagrun_timeout=datetime.timedelta(minutes=30),
    dag_id="55899_bug",
)
def test():
    nums = get_nums()
    processed = process_number.expand(n=nums)
    processed >> log_success()

test()
```

### Before
<img width="1917" height="1016" alt="Screenshot 2025-10-21 at 17 57 20" src="https://github.com/user-attachments/assets/d5220b87-a23b-40f7-8ecf-cb1b39d72f53" />

### After
<img width="1923" height="937" alt="Screenshot 2025-10-21 at 17 56 57" src="https://github.com/user-attachments/assets/37f75b19-2e80-4765-b9cb-e425f9054b78" />

(cherry picked from commit c3f53b1)
@kaxil kaxil removed this from the Airflow 3.1.1 milestone Oct 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:serialization kind:bug This is a clearly a bug type:bug-fix Changelog: Bug Fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants