-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Closed
Labels
affected_version:2.3Issues Reported for 2.3Issues Reported for 2.3affected_version:main_branchIssues Reported for main branchIssues Reported for main brancharea:corekind:bugThis is a clearly a bugThis is a clearly a bug
Milestone
Description
Apache Airflow version
2.3.0b1 (pre-release)
What happened
When attempting to create a DAG containing Task Groups and in those Task Groups there are Labels between nodes, the DAG fails to import due to cycle detection.
Consider this DAG:
from pendulum import datetime
from airflow.decorators import dag, task, task_group
from airflow.utils.edgemodifier import Label
@task
def begin():
...
@task
def end():
...
@dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
def task_groups_with_edge_labels():
@task_group
def group():
begin() >> Label("label") >> end()
group()
_ = task_groups_with_edge_labels()When attempting to import the DAG, this error message is displayed:

This also occurs on the main branch as well.
What you think should happen instead
Users should be able to specify Labels between tasks within a Task Group.
How to reproduce
-
Use the DAG mentioned above and try to import into an Airflow environment
-
Or, create a simple unit test of the following and execute said test.
def test_cycle_task_group_with_edge_labels(self):
from airflow.models.baseoperator import chain
from airflow.utils.task_group import TaskGroup
from airflow.utils.edgemodifier import Label
dag = DAG('dag', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
with dag:
with TaskGroup(group_id="task_group") as task_group:
op1 = EmptyOperator(task_id='A')
op2 = EmptyOperator(task_id='B')
op1 >> Label("label") >> op2
assert not check_cycle(dag)A AirflowDagCycleException should be thrown:
tests/utils/test_dag_cycle.py::TestCycleTester::test_cycle_task_group_with_edge_labels FAILED [100%]
=============================================================================================== FAILURES ===============================================================================================
________________________________________________________________________ TestCycleTester.test_cycle_task_group_with_edge_labels ________________________________________________________________________
self = <tests.utils.test_dag_cycle.TestCycleTester testMethod=test_cycle_task_group_with_edge_labels>
def test_cycle_task_group_with_edge_labels(self):
from airflow.models.baseoperator import chain
from airflow.utils.task_group import TaskGroup
from airflow.utils.edgemodifier import Label
dag = DAG('dag', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
with dag:
with TaskGroup(group_id="task_group") as task_group:
op1 = EmptyOperator(task_id='A')
op2 = EmptyOperator(task_id='B')
op1 >> Label("label") >> op2
> assert not check_cycle(dag)
tests/utils/test_dag_cycle.py:168:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
airflow/utils/dag_cycle_tester.py:76: in check_cycle
child_to_check = _check_adjacent_tasks(current_task_id, task)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
task_id = 'task_group.B', current_task = <Task(EmptyOperator): task_group.B>
def _check_adjacent_tasks(task_id, current_task):
"""Returns first untraversed child task, else None if all tasks traversed."""
for adjacent_task in current_task.get_direct_relative_ids():
if visited[adjacent_task] == CYCLE_IN_PROGRESS:
msg = f"Cycle detected in DAG. Faulty task: {task_id}"
> raise AirflowDagCycleException(msg)
E airflow.exceptions.AirflowDagCycleException: Cycle detected in DAG. Faulty task: task_group.B
airflow/utils/dag_cycle_tester.py:62: AirflowDagCycleException
---------------------------------------------------------------------------------------- Captured stdout setup -----------------------------------------------------------------------------------------
========================= AIRFLOW ==========================
Home of the user: /root
Airflow home /root/airflow
Skipping initializing of the DB as it was initialized already.
You can re-initialize the database by adding --with-db-init flag when running tests.
======================================================================================= short test summary info ========================================================================================
FAILED tests/utils/test_dag_cycle.py::TestCycleTester::test_cycle_task_group_with_edge_labels - airflow.exceptions.AirflowDagCycleException: Cycle detected in DAG. Faulty task: task_group.B
==================================================================================== 1 failed, 2 warnings in 1.08s =====================================================================================
Operating System
Debian GNU/Linux 11 (bullseye)
Versions of Apache Airflow Providers
N/A
Deployment
Astronomer
Deployment details
This issue also occurs on the main branch using Breeze.
Anything else
Possibly related to #21404
When the Label is removed, no cycle is detected.
from pendulum import datetime
from airflow.decorators import dag, task, task_group
from airflow.utils.edgemodifier import Label
@task
def begin():
...
@task
def end():
...
@dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
def task_groups_with_edge_labels():
@task_group
def group():
begin() >> end()
group()
_ = task_groups_with_edge_labels()Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
affected_version:2.3Issues Reported for 2.3Issues Reported for 2.3affected_version:main_branchIssues Reported for main branchIssues Reported for main brancharea:corekind:bugThis is a clearly a bugThis is a clearly a bug
