-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Closed
Closed
Copy link
Labels
affected_version:main_branchIssues Reported for main branchIssues Reported for main brancharea:corekind:bugThis is a clearly a bugThis is a clearly a bug
Description
Apache Airflow version
main (development)
What happened
I'm getting an import error of Cycle detected in DAG. Faulty task: group1.dummy4
when I load this dag into airflow
What you expected to happen
I expected the dag to have no import errors. It imports fine in version 2.2.4-2 but when switching to version 2.3.0.dev20220302 it gives the import error described above.
How to reproduce
from airflow.utils.edgemodifier import Label
from airflow.models.baseoperator import chain
from airflow.decorators import dag, task, task_group
from airflow.utils.dot_renderer import render_dag
from textwrap import indent
from datetime import datetime, timedelta
two_days = datetime.now() - timedelta(days=2)
@dag(dag_id="taskflow_compare2",
schedule_interval=None,
start_date=two_days,
tags=['core'])
def task_grouper():
@task
def assert_homomorphic(task_group_names, **context):
"""
The structure of all of the task groups above should be the same
"""
# get the dag in dot notation, focus only on its edges
dag = context["dag"]
#gives string which represents whole dag structure
graph = render_dag(dag)
print("Whole DAG:")
print(indent(str(graph), " "))
lines = list(filter(lambda x: "->" in x, str(graph).split("\n")))
# bin them by task group, then remove the group names
group_strings = []
#removes everything thats not a task name
for name in task_group_names:
print(name)
relevant_lines = filter(lambda x: name in x, lines)
normalized_lines = map(
lambda x: x.strip().replace(name, ""), sorted(relevant_lines)
)
edges_str = "\n".join(normalized_lines)
group_strings.append(edges_str)
print(indent(edges_str, " "))
# these should be identical
for xgroup, ygroup in zip(group_strings, group_strings[1:]):
assert xgroup == ygroup
@task_group(group_id="group1")
def grouper1():
@task
def dummy00():
return 0
@task
def dummy0():
return 0
@task
def dummy1(val):
return val + 1
@task
def dummy2(val):
return val + 2
@task
def dummy3(val):
return val + 3
@task
def dummy4(val):
return val + 4
@task
def dummy5(val):
return val + 5
@task
def dummy6(val):
return val + 6
@task
def dummy7(val):
return val + 7
@task
def dummy8(val):
return val + 8
@task
def dummy9(val):
return val + 9
@task
def dummy10(val):
return val + 10
d00 = dummy00()
d0 = dummy0()
d1 = dummy1(0)
d2 = dummy2(0)
d3 = dummy3(0)
d4 = dummy4(0)
d5 = dummy5(0)
d6 = dummy6(0)
d7 = dummy7(0)
d8 = dummy8(0)
d9 = dummy9(0)
d10 = dummy10(0)
chain(d00, [d0, d1, d2], d3, [Label("branch one"), Label("branch two"), Label("branch three"), Label("branch four"), Label("branch five")], [d4, d5, d6, d7, d8], d9, d10)
return d10
@task_group(group_id="group2")
def grouper2():
@task
def dummy00():
return 0
@task
def dummy0():
return 0
@task
def dummy1(val):
return val + 1
@task
def dummy2(val):
return val + 2
@task
def dummy3(val):
return val + 3
@task
def dummy4(val):
return val + 4
@task
def dummy5(val):
return val + 5
@task
def dummy6(val):
return val + 6
@task
def dummy7(val):
return val + 7
@task
def dummy8(val):
return val + 8
@task
def dummy9(val):
return val + 9
@task
def dummy10(val):
return val + 10
d00 = dummy00()
d0 = dummy0()
d1 = dummy1(0)
d2 = dummy2(0)
d3 = dummy3(0)
d4 = dummy4(0)
d5 = dummy5(0)
d6 = dummy6(0)
d7 = dummy7(0)
d8 = dummy8(0)
d9 = dummy9(0)
d10 = dummy10(0)
d00.set_downstream(d0)
d00.set_downstream(d1)
d00.set_downstream(d2)
d0.set_downstream(d3)
d1.set_downstream(d3)
d2.set_downstream(d3)
d3.set_downstream(d4, edge_modifier=Label("branch one"))
d3.set_downstream(d5, edge_modifier=Label("branch two"))
d3.set_downstream(d6, edge_modifier=Label("branch three"))
d3.set_downstream(d7, edge_modifier=Label("branch four"))
d3.set_downstream(d8, edge_modifier=Label("branch five"))
d4.set_downstream(d9)
d5.set_downstream(d9)
d6.set_downstream(d9)
d7.set_downstream(d9)
d8.set_downstream(d9)
d9.set_downstream(d10)
#return dummy4(dummy3(dummy2(dummy1(dummy0()))))
return d10
tg1 = grouper1()
tg2 = grouper2()
[tg1, tg2] >> assert_homomorphic(["group1", "group2"])
dag = task_grouper()
Operating System
Docker (debian:buster)
Versions of Apache Airflow Providers
No response
Deployment
Astronomer
Deployment details
Using the Astro CLI with this image:
quay.io/astronomer/ap-airflow-dev:main
Anything else
Happens every time.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
affected_version:main_branchIssues Reported for main branchIssues Reported for main brancharea:corekind:bugThis is a clearly a bugThis is a clearly a bug