Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions airflow-core/src/airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -2007,7 +2007,8 @@ def schedule_tis(

Each element of ``schedulable_tis`` should have its ``task`` attribute already set.

Any EmptyOperator without callbacks or outlets is instead set straight to the success state.
Any EmptyOperator without ``on_execute_callback`` or ``on_success_callback`` or ``inlets`` or
``outlets`` is instead set straight to the success state, without execution.

All the TIs should belong to this DagRun, but this code is in the hot-path, this is not checked -- it
is the caller's responsibility to call this function only with TIs from a single dag run.
Expand All @@ -2017,17 +2018,8 @@ def schedule_tis(
empty_ti_ids: list[str] = []
schedulable_ti_ids: list[str] = []
for ti in schedulable_tis:
task = ti.task
if TYPE_CHECKING:
assert isinstance(task, Operator)
if (
task.inherits_from_empty_operator
and not task.has_on_execute_callback
and not task.has_on_success_callback
and not task.outlets
and not task.inlets
):
empty_ti_ids.append(ti.id)
if ti.is_schedulable:
schedulable_ti_ids.append(ti.id)
# Check "start_trigger_args" to see whether the operator supports
# start execution from triggerer. If so, we'll check "start_from_trigger"
# to see whether this feature is turned on and defer this task.
Expand All @@ -2046,7 +2038,7 @@ def schedule_tis(
# else:
# schedulable_ti_ids.append(ti.id)
else:
schedulable_ti_ids.append(ti.id)
empty_ti_ids.append(ti.id)

count = 0

Expand Down
35 changes: 35 additions & 0 deletions airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2180,6 +2180,41 @@ def duration_expression_update(
}
)

@property
def is_schedulable(self):
"""Determine if the task_instance should be scheduled or short-circuited to ``success``."""
return self.is_task_schedulable(self.task)

@staticmethod
def is_task_schedulable(task: Operator) -> bool:
"""
Determine if the task should be scheduled instead of being short-circuited to ``success``.

A task requires scheduling if it is not a trivial EmptyOperator, i.e. one of the
following conditions holds:

* it does **not** inherit from ``EmptyOperator``
* it defines an ``on_execute_callback``
* it defines an ``on_success_callback``
* it declares any ``outlets``
* it declares any ``inlets``

If none of these are true, the task is considered empty and is immediately marked
successful without being scheduled.

Note: keeping this check as a separate public method is important so it can also be used
by listeners (when a task is not scheduled, listeners are never called). For example,
the OpenLineage listener checks all tasks at DAG start, and using this method lets
it consistently determine whether the listener will run for each task.
"""
return bool(
not task.inherits_from_empty_operator
or task.has_on_execute_callback
or task.has_on_success_callback
or task.outlets
or task.inlets
)


def _find_common_ancestor_mapped_group(node1: Operator, node2: Operator) -> SerializedTaskGroup | None:
"""Given two operators, find their innermost common mapped task group."""
Expand Down
42 changes: 42 additions & 0 deletions airflow-core/tests/unit/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1566,6 +1566,48 @@ def test_log_url(self, create_task_instance):
expected_url = "http://localhost:8080/dags/my_dag/runs/test/tasks/op/mapped/1?try_number=2"
assert ti.log_url == expected_url

@pytest.mark.parametrize(
"kwargs",
[
{"inlets": [Asset(uri="file://some.txt")]},
{"outlets": [Asset(uri="file://some.txt")]},
{"on_success_callback": lambda *args, **kwargs: None},
{"on_execute_callback": lambda *args, **kwargs: None},
],
)
def test_is_schedulable_task_empty_operator_evaluates_true(self, kwargs, create_task_instance):
ti = create_task_instance(
dag_id="my_dag", task_id="op", logical_date=timezone.datetime(2018, 1, 1), **kwargs
)
assert ti.is_schedulable

@pytest.mark.parametrize(
"kwargs",
[
{},
{"on_failure_callback": lambda *args, **kwargs: None},
{"on_skipped_callback": lambda *args, **kwargs: None},
{"on_retry_callback": lambda *args, **kwargs: None},
],
)
def test_is_schedulable_task_empty_operator_evaluates_false(self, kwargs, create_task_instance):
ti = create_task_instance(
dag_id="my_dag", task_id="op", logical_date=timezone.datetime(2018, 1, 1), **kwargs
)
assert not ti.is_schedulable

def test_is_schedulable_task_non_empty_operator(self):
dag = DAG(dag_id="test_dag")

regular_task = BashOperator(task_id="regular", bash_command="echo test", dag=dag)
mapped_task = BashOperator.partial(task_id="mapped", dag=dag).expand(bash_command=["echo 1"])

regular_ti = TaskInstance(task=regular_task, dag_version_id=mock.MagicMock())
mapped_ti = TaskInstance(task=mapped_task, dag_version_id=mock.MagicMock())

assert regular_ti.is_schedulable
assert mapped_ti.is_schedulable

def test_mark_success_url(self, create_task_instance):
now = pendulum.now("Europe/Brussels")
ti = create_task_instance(dag_id="dag", task_id="op", logical_date=now)
Expand Down
54 changes: 54 additions & 0 deletions airflow-core/tests/unit/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
from airflow.models.asset import AssetModel
from airflow.models.connection import Connection
from airflow.models.mappedoperator import MappedOperator
from airflow.models.taskinstance import TaskInstance as TI
from airflow.models.xcom import XCOM_RETURN_KEY, XComModel
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from airflow.providers.standard.operators.bash import BashOperator
Expand Down Expand Up @@ -3821,6 +3822,59 @@ def test_task_callback_boolean_optimization(callback_config, expected_flags, is_
assert getattr(deserialized, flag) is expected


@pytest.mark.parametrize(
"kwargs",
[
{"inlets": [Asset(uri="file://some.txt")]},
{"outlets": [Asset(uri="file://some.txt")]},
{"on_success_callback": lambda *args, **kwargs: None},
{"on_execute_callback": lambda *args, **kwargs: None},
],
)
def test_is_schedulable_task_empty_operator_evaluates_true(kwargs):
from airflow.providers.standard.operators.empty import EmptyOperator

dag = DAG(dag_id="test_dag")
task = EmptyOperator(task_id="empty_task", dag=dag, **kwargs)

serialized_task = BaseSerialization.deserialize(BaseSerialization.serialize(task))

assert TI.is_task_schedulable(serialized_task)


@pytest.mark.parametrize(
"kwargs",
[
{},
{"on_failure_callback": lambda *args, **kwargs: None},
{"on_skipped_callback": lambda *args, **kwargs: None},
{"on_retry_callback": lambda *args, **kwargs: None},
],
)
def test_is_schedulable_task_empty_operator_evaluates_false(kwargs):
from airflow.providers.standard.operators.empty import EmptyOperator

dag = DAG(dag_id="test_dag")
task = EmptyOperator(task_id="empty_task", dag=dag, **kwargs)

serialized_task = BaseSerialization.deserialize(BaseSerialization.serialize(task))

assert not TI.is_task_schedulable(serialized_task)


def test_is_schedulable_task_non_empty_operator():
dag = DAG(dag_id="test_dag")

regular_task = BashOperator(task_id="regular", bash_command="echo test", dag=dag)
mapped_task = BashOperator.partial(task_id="mapped", dag=dag).expand(bash_command=["echo 1"])

serialized_regular = BaseSerialization.deserialize(BaseSerialization.serialize(regular_task))
serialized_mapped = BaseSerialization.deserialize(BaseSerialization.serialize(mapped_task))

assert TI.is_task_schedulable(serialized_regular)
assert TI.is_task_schedulable(serialized_mapped)


def test_task_callback_properties_exist():
"""Test that all callback boolean properties exist on both regular and mapped operators."""
dag = DAG(dag_id="test_dag")
Expand Down
6 changes: 6 additions & 0 deletions devel-common/src/tests_common/pytest_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,9 @@ def maker(
on_execute_callback=None,
on_failure_callback=None,
on_retry_callback=None,
on_skipped_callback=None,
inlets=None,
outlets=None,
email=None,
map_index=-1,
hostname=None,
Expand All @@ -1512,6 +1515,9 @@ def maker(
on_execute_callback=on_execute_callback,
on_failure_callback=on_failure_callback,
on_retry_callback=on_retry_callback,
on_skipped_callback=on_skipped_callback,
inlets=inlets,
outlets=outlets,
email=email,
pool=pool,
trigger_rule=trigger_rule,
Expand Down
Loading