Skip to content

Commit d9e8cc0

Browse files
committed
Add few tests on the mapped task group.
1 parent 89aa437 commit d9e8cc0

File tree

1 file changed

+59
-0
lines changed

1 file changed

+59
-0
lines changed

tests/models/test_mappedoperator.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1568,3 +1568,62 @@ def my_teardown(val):
15681568
"tg_2.my_work": "skipped",
15691569
}
15701570
assert states == expected
1571+
1572+
def test_skip_one_mapped_task_from_task_group_with_generator(self, dag_maker):
1573+
with dag_maker() as dag:
1574+
1575+
@task
1576+
def make_list():
1577+
return [1, 2, 3]
1578+
1579+
@task
1580+
def double(n):
1581+
if n == 2:
1582+
raise AirflowSkipException()
1583+
return n * 2
1584+
1585+
@task
1586+
def last(n):
1587+
...
1588+
1589+
@task_group
1590+
def group(n: int) -> None:
1591+
last(double(n))
1592+
1593+
group.expand(n=make_list())
1594+
1595+
dr = dag.test()
1596+
states = self.get_states(dr)
1597+
expected = {
1598+
"group.double": {0: "success", 1: "skipped", 2: "success"},
1599+
"group.last": {0: "success", 1: "skipped", 2: "success"},
1600+
"make_list": "success",
1601+
}
1602+
assert states == expected
1603+
1604+
def test_skip_one_mapped_task_from_task_group(self, dag_maker):
1605+
with dag_maker() as dag:
1606+
1607+
@task
1608+
def double(n):
1609+
if n == 2:
1610+
raise AirflowSkipException()
1611+
return n * 2
1612+
1613+
@task
1614+
def last(n):
1615+
...
1616+
1617+
@task_group
1618+
def group(n: int) -> None:
1619+
last(double(n))
1620+
1621+
group.expand(n=[1, 2, 3])
1622+
1623+
dr = dag.test()
1624+
states = self.get_states(dr)
1625+
expected = {
1626+
"group.double": {0: "success", 1: "skipped", 2: "success"},
1627+
"group.last": {0: "success", 1: "skipped", 2: "success"},
1628+
}
1629+
assert states == expected

0 commit comments

Comments
 (0)