Skip to content

Cycle incorrectly detected in DAGs when using Labels within Task Groups #23285

@josh-fell

Description

@josh-fell

Apache Airflow version

2.3.0b1 (pre-release)

What happened

When attempting to create a DAG containing Task Groups and in those Task Groups there are Labels between nodes, the DAG fails to import due to cycle detection.

Consider this DAG:

from pendulum import datetime

from airflow.decorators import dag, task, task_group
from airflow.utils.edgemodifier import Label


@task
def begin():
    ...


@task
def end():
    ...


@dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
def task_groups_with_edge_labels():
    @task_group
    def group():
        begin() >> Label("label") >> end()

    group()


_ = task_groups_with_edge_labels()

When attempting to import the DAG, this error message is displayed:
image

This also occurs on the main branch as well.

What you think should happen instead

Users should be able to specify Labels between tasks within a Task Group.

How to reproduce

  • Use the DAG mentioned above and try to import into an Airflow environment

  • Or, create a simple unit test of the following and execute said test.

    def test_cycle_task_group_with_edge_labels(self):
        from airflow.models.baseoperator import chain
        from airflow.utils.task_group import TaskGroup
        from airflow.utils.edgemodifier import Label

        dag = DAG('dag', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})

        with dag:
            with TaskGroup(group_id="task_group") as task_group:
                op1 = EmptyOperator(task_id='A')
                op2 = EmptyOperator(task_id='B')

                op1 >> Label("label") >> op2

        assert not check_cycle(dag)

A AirflowDagCycleException should be thrown:

tests/utils/test_dag_cycle.py::TestCycleTester::test_cycle_task_group_with_edge_labels FAILED                                                                                                    [100%]

=============================================================================================== FAILURES ===============================================================================================
________________________________________________________________________ TestCycleTester.test_cycle_task_group_with_edge_labels ________________________________________________________________________

self = <tests.utils.test_dag_cycle.TestCycleTester testMethod=test_cycle_task_group_with_edge_labels>

    def test_cycle_task_group_with_edge_labels(self):
        from airflow.models.baseoperator import chain
        from airflow.utils.task_group import TaskGroup
        from airflow.utils.edgemodifier import Label

        dag = DAG('dag', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})

        with dag:
            with TaskGroup(group_id="task_group") as task_group:
                op1 = EmptyOperator(task_id='A')
                op2 = EmptyOperator(task_id='B')

                op1 >> Label("label") >> op2

>       assert not check_cycle(dag)

tests/utils/test_dag_cycle.py:168:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
airflow/utils/dag_cycle_tester.py:76: in check_cycle
    child_to_check = _check_adjacent_tasks(current_task_id, task)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

task_id = 'task_group.B', current_task = <Task(EmptyOperator): task_group.B>

    def _check_adjacent_tasks(task_id, current_task):
        """Returns first untraversed child task, else None if all tasks traversed."""
        for adjacent_task in current_task.get_direct_relative_ids():
            if visited[adjacent_task] == CYCLE_IN_PROGRESS:
                msg = f"Cycle detected in DAG. Faulty task: {task_id}"
>               raise AirflowDagCycleException(msg)
E               airflow.exceptions.AirflowDagCycleException: Cycle detected in DAG. Faulty task: task_group.B

airflow/utils/dag_cycle_tester.py:62: AirflowDagCycleException
---------------------------------------------------------------------------------------- Captured stdout setup -----------------------------------------------------------------------------------------
========================= AIRFLOW ==========================
Home of the user: /root
Airflow home /root/airflow
Skipping initializing of the DB as it was initialized already.
You can re-initialize the database by adding --with-db-init flag when running tests.
======================================================================================= short test summary info ========================================================================================
FAILED tests/utils/test_dag_cycle.py::TestCycleTester::test_cycle_task_group_with_edge_labels - airflow.exceptions.AirflowDagCycleException: Cycle detected in DAG. Faulty task: task_group.B
==================================================================================== 1 failed, 2 warnings in 1.08s =====================================================================================

Operating System

Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

N/A

Deployment

Astronomer

Deployment details

This issue also occurs on the main branch using Breeze.

Anything else

Possibly related to #21404

When the Label is removed, no cycle is detected.

from pendulum import datetime

from airflow.decorators import dag, task, task_group
from airflow.utils.edgemodifier import Label


@task
def begin():
    ...


@task
def end():
    ...


@dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
def task_groups_with_edge_labels():
    @task_group
    def group():
        begin() >> end()

    group()


_ = task_groups_with_edge_labels()

image

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions