Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2651,7 +2651,17 @@ def schedule_downstream_tasks(self, session=None):
task_id for task_id in partial_dag.task_ids if task_id not in task.downstream_task_ids
}

schedulable_tis = [ti for ti in info.schedulable_tis if ti.task_id not in skippable_task_ids]
schedulable_tis = [
ti
for ti in info.schedulable_tis
if ti.task_id not in skippable_task_ids
and not (
ti.task.inherits_from_empty_operator
and not ti.task.on_execute_callback
and not ti.task.on_success_callback
and not ti.task.outlets
)
]
for schedulable_ti in schedulable_tis:
if not hasattr(schedulable_ti, "task"):
schedulable_ti.task = task.dag.get_task(schedulable_ti.task_id)
Expand Down
44 changes: 44 additions & 0 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -3859,6 +3859,50 @@ def last_task():
assert "0 downstream tasks scheduled from follow-on schedule" in caplog.text


def test_empty_operator_is_not_considered_in_mini_scheduler(dag_maker, caplog, session):
"""
This tests verify that operators with inherits_from_empty_operator are not considered by mini scheduler.
Such operators should not run on workers thus the mini scheduler optimization should skip them and not
submit them directly to worker.
"""
with dag_maker() as dag:

@dag.task
def first_task():
print(2)

@dag.task
def second_task():
print(2)

third_task = EmptyOperator(task_id="third_task")
forth_task = EmptyOperator(task_id="forth_task", on_success_callback=lambda x: print("hi"))

first_task() >> [second_task(), third_task, forth_task]
dag_run = dag_maker.create_dagrun()
first_ti = dag_run.get_task_instance(task_id="first_task")
second_ti = dag_run.get_task_instance(task_id="second_task")
third_ti = dag_run.get_task_instance(task_id="third_task")
forth_ti = dag_run.get_task_instance(task_id="forth_task")
first_ti.state = State.SUCCESS
second_ti.state = State.NONE
third_ti.state = State.NONE
forth_ti.state = State.NONE
session.merge(first_ti)
session.merge(second_ti)
session.merge(third_ti)
session.merge(forth_ti)
session.commit()
first_ti.schedule_downstream_tasks(session=session)
second_task = dag_run.get_task_instance(task_id="second_task")
third_task = dag_run.get_task_instance(task_id="third_task")
forth_task = dag_run.get_task_instance(task_id="forth_task")
assert second_task.state == State.SCHEDULED
assert third_task.state == State.NONE
assert forth_task.state == State.SCHEDULED
assert "2 downstream tasks scheduled from follow-on schedule" in caplog.text


def test_mapped_task_expands_in_mini_scheduler_if_upstreams_are_done(dag_maker, caplog, session):
"""Test that mini scheduler expands mapped task"""
with dag_maker() as dag:
Expand Down